You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by an...@apache.org on 2015/07/30 11:14:28 UTC

[6/9] activemq-artemis git commit: ARTEMIS-163 First pass on the native AIO refactoring

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/IOCompletion.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/IOCompletion.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/IOCompletion.java
index b85a845..d0140f1 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/IOCompletion.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/IOCompletion.java
@@ -16,7 +16,9 @@
  */
 package org.apache.activemq.artemis.core.journal;
 
-public interface IOCompletion extends IOAsyncTask
+import org.apache.activemq.artemis.core.io.IOCallback;
+
+public interface IOCompletion extends IOCallback
 {
    void storeLineUp();
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/IOCriticalErrorListener.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/IOCriticalErrorListener.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/IOCriticalErrorListener.java
deleted file mode 100644
index fc0bbf9..0000000
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/IOCriticalErrorListener.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.journal;
-
-public interface IOCriticalErrorListener
-{
-   void onIOException(Exception code, String message, SequentialFile file);
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
index f3335b0..6b0beab 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/Journal.java
@@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.journal;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
 import org.apache.activemq.artemis.core.journal.impl.JournalFile;
 import org.apache.activemq.artemis.core.server.ActiveMQComponent;
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/SequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/SequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/SequentialFile.java
deleted file mode 100644
index 34e6d02..0000000
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/SequentialFile.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.journal;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.core.journal.impl.TimedBuffer;
-
-public interface SequentialFile
-{
-   /*
-    * Creates the file if it doesn't already exist, then opens it
-    */
-   void open() throws Exception;
-
-   boolean isOpen();
-
-   boolean exists();
-
-   /**
-    * The maximum number of simultaneous writes accepted
-    * @param maxIO
-    * @throws Exception
-    */
-   void open(int maxIO, boolean useExecutor) throws Exception;
-
-   boolean fits(int size);
-
-   int getAlignment() throws Exception;
-
-   int calculateBlockStart(int position) throws Exception;
-
-   String getFileName();
-
-   void fill(int position, int size, byte fillCharacter) throws Exception;
-
-   void delete() throws IOException, InterruptedException, ActiveMQException;
-
-   void write(ActiveMQBuffer bytes, boolean sync, IOAsyncTask callback) throws Exception;
-
-   void write(ActiveMQBuffer bytes, boolean sync) throws Exception;
-
-   void write(EncodingSupport bytes, boolean sync, IOAsyncTask callback) throws Exception;
-
-   void write(EncodingSupport bytes, boolean sync) throws Exception;
-
-   /**
-    * Write directly to the file without using any buffer
-    * @param bytes the ByteBuffer must be compatible with the SequentialFile implementation (AIO or
-    *           NIO). To be safe, use a buffer from the corresponding
-    *           {@link SequentialFileFactory#newBuffer(int)}.
-    */
-   void writeDirect(ByteBuffer bytes, boolean sync, IOAsyncTask callback);
-
-   /**
-    * Write directly to the file without using any buffer
-    * @param bytes the ByteBuffer must be compatible with the SequentialFile implementation (AIO or
-    *           NIO). To be safe, use a buffer from the corresponding
-    *           {@link SequentialFileFactory#newBuffer(int)}.
-    */
-   void writeDirect(ByteBuffer bytes, boolean sync) throws Exception;
-
-   /**
-    * Write directly to the file. This is used by compacting and other places where we write a big
-    * buffer in a single shot. writeInternal should always block until the entire write is sync on
-    * disk.
-    * @param bytes the ByteBuffer must be compatible with the SequentialFile implementation (AIO or
-    *           NIO). To be safe, use a buffer from the corresponding
-    *           {@link SequentialFileFactory#newBuffer(int)}.
-    */
-   void writeInternal(ByteBuffer bytes) throws Exception;
-
-   /**
-    * @param bytes the ByteBuffer must be compatible with the SequentialFile implementation (AIO or
-    *           NIO). To be safe, use a buffer from the corresponding
-    *           {@link SequentialFileFactory#newBuffer(int)}.
-    */
-   int read(ByteBuffer bytes, IOAsyncTask callback) throws Exception;
-
-   /**
-    * @param bytes the ByteBuffer must be compatible with the SequentialFile implementation (AIO or
-    *           NIO). To be safe, use a buffer from the corresponding
-    *           {@link SequentialFileFactory#newBuffer(int)}.
-    */
-   int read(ByteBuffer bytes) throws Exception;
-
-   void position(long pos) throws IOException;
-
-   long position();
-
-   void close() throws Exception;
-
-   void waitForClose() throws Exception;
-
-   void sync() throws IOException;
-
-   long size() throws Exception;
-
-   void renameTo(String newFileName) throws Exception;
-
-   SequentialFile cloneFile();
-
-   void copyTo(SequentialFile newFileName) throws Exception;
-
-   void setTimedBuffer(TimedBuffer buffer);
-
-   /**
-    * Returns a native File of the file underlying this sequential file.
-    */
-   File getJavaFile();
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/SequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/SequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/SequentialFileFactory.java
deleted file mode 100644
index cb47bd9..0000000
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/SequentialFileFactory.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.journal;
-
-import java.io.File;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-/**
- *
- * A SequentialFileFactory
- */
-public interface SequentialFileFactory
-{
-   SequentialFile createSequentialFile(String fileName, int maxIO);
-
-   /**
-    * Lists files that end with the given extension.
-    * <p>
-    * This method inserts a ".' before the extension.
-    * @param extension
-    * @return
-    * @throws Exception
-    */
-   List<String> listFiles(String extension) throws Exception;
-
-   boolean isSupportsCallbacks();
-
-   /** The SequentialFile will call this method when a disk IO Error happens during the live phase. */
-   void onIOError(Exception exception, String message, SequentialFile file);
-
-   /** used for cases where you need direct buffer outside of the journal context.
-    *  This is because the native layer has a method that can be reused in certain cases like paging */
-   ByteBuffer allocateDirectBuffer(int size);
-
-   /** used for cases where you need direct buffer outside of the journal context.
-    *  This is because the native layer has a method that can be reused in certain cases like paging */
-   void releaseDirectBuffer(ByteBuffer buffer);
-
-   /**
-    * Note: You need to release the buffer if is used for reading operations. You don't need to do
-    * it if using writing operations (AIO Buffer Lister will take of writing operations)
-    * @param size
-    * @return the allocated ByteBuffer
-    */
-   ByteBuffer newBuffer(int size);
-
-   void releaseBuffer(ByteBuffer buffer);
-
-   void activateBuffer(SequentialFile file);
-
-   void deactivateBuffer();
-
-   // To be used in tests only
-   ByteBuffer wrapBuffer(byte[] bytes);
-
-   int getAlignment();
-
-   int calculateBlockSize(int bytes);
-
-   File getDirectory();
-
-   void clearBuffer(ByteBuffer buffer);
-
-   void start();
-
-   void stop();
-
-   /**
-    * Creates the directory if it does not exist yet.
-    */
-   void createDirs() throws Exception;
-
-   void flush();
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AIOSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AIOSequentialFile.java
deleted file mode 100644
index acef8a5..0000000
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AIOSequentialFile.java
+++ /dev/null
@@ -1,326 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.journal.impl;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.concurrent.Executor;
-
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.core.asyncio.AsynchronousFile;
-import org.apache.activemq.artemis.core.asyncio.BufferCallback;
-import org.apache.activemq.artemis.core.asyncio.IOExceptionListener;
-import org.apache.activemq.artemis.core.asyncio.impl.AsynchronousFileImpl;
-import org.apache.activemq.artemis.core.journal.IOAsyncTask;
-import org.apache.activemq.artemis.core.journal.SequentialFile;
-import org.apache.activemq.artemis.core.journal.SequentialFileFactory;
-
-public class AIOSequentialFile extends AbstractSequentialFile implements IOExceptionListener
-{
-   private boolean opened = false;
-
-   private final int maxIO;
-
-   private AsynchronousFile aioFile;
-
-   private final BufferCallback bufferCallback;
-
-   /** The pool for Thread pollers */
-   private final Executor pollerExecutor;
-
-   public AIOSequentialFile(final SequentialFileFactory factory,
-                            final int bufferSize,
-                            final long bufferTimeoutMilliseconds,
-                            final File directory,
-                            final String fileName,
-                            final int maxIO,
-                            final BufferCallback bufferCallback,
-                            final Executor writerExecutor,
-                            final Executor pollerExecutor)
-   {
-      super(directory, fileName, factory, writerExecutor);
-      this.maxIO = maxIO;
-      this.bufferCallback = bufferCallback;
-      this.pollerExecutor = pollerExecutor;
-   }
-
-   public boolean isOpen()
-   {
-      return opened;
-   }
-
-   public int getAlignment()
-   {
-      checkOpened();
-
-      return aioFile.getBlockSize();
-   }
-
-   public int calculateBlockStart(final int position)
-   {
-      int alignment = getAlignment();
-
-      int pos = (position / alignment + (position % alignment != 0 ? 1 : 0)) * alignment;
-
-      return pos;
-   }
-
-   public SequentialFile cloneFile()
-   {
-      return new AIOSequentialFile(factory,
-                                   -1,
-                                   -1,
-                                   getFile().getParentFile(),
-                                   getFile().getName(),
-                                   maxIO,
-                                   bufferCallback,
-                                   writerExecutor,
-                                   pollerExecutor);
-   }
-
-   @Override
-   public synchronized void close() throws IOException, InterruptedException, ActiveMQException
-   {
-      if (!opened)
-      {
-         return;
-      }
-
-      super.close();
-
-      opened = false;
-
-      timedBuffer = null;
-
-      aioFile.close();
-      aioFile = null;
-
-      notifyAll();
-   }
-
-   @Override
-   public synchronized void waitForClose() throws Exception
-   {
-      while (isOpen())
-      {
-         wait();
-      }
-   }
-
-   public void fill(final int position, final int size, final byte fillCharacter) throws Exception
-   {
-      checkOpened();
-
-      int fileblockSize = aioFile.getBlockSize();
-
-      int blockSize = fileblockSize;
-
-      if (size % (100 * 1024 * 1024) == 0)
-      {
-         blockSize = 100 * 1024 * 1024;
-      }
-      else if (size % (10 * 1024 * 1024) == 0)
-      {
-         blockSize = 10 * 1024 * 1024;
-      }
-      else if (size % (1024 * 1024) == 0)
-      {
-         blockSize = 1024 * 1024;
-      }
-      else if (size % (10 * 1024) == 0)
-      {
-         blockSize = 10 * 1024;
-      }
-      else
-      {
-         blockSize = fileblockSize;
-      }
-
-      int blocks = size / blockSize;
-
-      if (size % blockSize != 0)
-      {
-         blocks++;
-      }
-
-      int filePosition = position;
-
-      if (position % fileblockSize != 0)
-      {
-         filePosition = (position / fileblockSize + 1) * fileblockSize;
-      }
-
-      aioFile.fill(filePosition, blocks, blockSize, fillCharacter);
-
-      fileSize = aioFile.size();
-   }
-
-   public void open() throws Exception
-   {
-      open(maxIO, true);
-   }
-
-   public synchronized void open(final int maxIO, final boolean useExecutor) throws ActiveMQException
-   {
-      opened = true;
-
-      aioFile = new AsynchronousFileImpl(useExecutor ? writerExecutor : null, pollerExecutor, this);
-
-      try
-      {
-         aioFile.open(getFile().getAbsolutePath(), maxIO);
-      }
-      catch (ActiveMQException e)
-      {
-         factory.onIOError(e, e.getMessage(), this);
-         throw e;
-      }
-
-      position.set(0);
-
-      aioFile.setBufferCallback(bufferCallback);
-
-      fileSize = aioFile.size();
-   }
-
-   public void setBufferCallback(final BufferCallback callback)
-   {
-      aioFile.setBufferCallback(callback);
-   }
-
-   public int read(final ByteBuffer bytes, final IOAsyncTask callback) throws ActiveMQException
-   {
-      int bytesToRead = bytes.limit();
-
-      long positionToRead = position.getAndAdd(bytesToRead);
-
-      bytes.rewind();
-
-      aioFile.read(positionToRead, bytesToRead, bytes, callback);
-
-      return bytesToRead;
-   }
-
-   public int read(final ByteBuffer bytes) throws Exception
-   {
-      SimpleWaitIOCallback waitCompletion = new SimpleWaitIOCallback();
-
-      int bytesRead = read(bytes, waitCompletion);
-
-      waitCompletion.waitCompletion();
-
-      return bytesRead;
-   }
-
-   public void sync()
-   {
-      throw new UnsupportedOperationException("This method is not supported on AIO");
-   }
-
-   public long size() throws Exception
-   {
-      if (aioFile == null)
-      {
-         return getFile().length();
-      }
-      else
-      {
-         return aioFile.size();
-      }
-   }
-
-   @Override
-   public String toString()
-   {
-      return "AIOSequentialFile:" + getFile().getAbsolutePath();
-   }
-
-   // Public methods
-   // -----------------------------------------------------------------------------------------------------
-
-   @Override
-   public void onIOException(Exception code, String message)
-   {
-      factory.onIOError(code, message, this);
-   }
-
-
-   public void writeDirect(final ByteBuffer bytes, final boolean sync) throws Exception
-   {
-      if (sync)
-      {
-         SimpleWaitIOCallback completion = new SimpleWaitIOCallback();
-
-         writeDirect(bytes, true, completion);
-
-         completion.waitCompletion();
-      }
-      else
-      {
-         writeDirect(bytes, false, DummyCallback.getInstance());
-      }
-   }
-
-   /**
-    *
-    * @param sync Not used on AIO
-    *  */
-   public void writeDirect(final ByteBuffer bytes, final boolean sync, final IOAsyncTask callback)
-   {
-      final int bytesToWrite = factory.calculateBlockSize(bytes.limit());
-
-      final long positionToWrite = position.getAndAdd(bytesToWrite);
-
-      aioFile.write(positionToWrite, bytesToWrite, bytes, callback);
-   }
-
-   public void writeInternal(final ByteBuffer bytes) throws ActiveMQException
-   {
-      final int bytesToWrite = factory.calculateBlockSize(bytes.limit());
-
-      final long positionToWrite = position.getAndAdd(bytesToWrite);
-
-      aioFile.writeInternal(positionToWrite, bytesToWrite, bytes);
-   }
-
-   // Protected methods
-   // -----------------------------------------------------------------------------------------------------
-
-   @Override
-   protected ByteBuffer newBuffer(int size, int limit)
-   {
-      size = factory.calculateBlockSize(size);
-      limit = factory.calculateBlockSize(limit);
-
-      ByteBuffer buffer = factory.newBuffer(size);
-      buffer.limit(limit);
-      return buffer;
-   }
-
-   // Private methods
-   // -----------------------------------------------------------------------------------------------------
-
-   private void checkOpened()
-   {
-      if (aioFile == null || !opened)
-      {
-         throw new IllegalStateException("File not opened");
-      }
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AIOSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AIOSequentialFileFactory.java
deleted file mode 100644
index 65e6a6f..0000000
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AIOSequentialFileFactory.java
+++ /dev/null
@@ -1,358 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.journal.impl;
-
-import java.io.File;
-import java.nio.ByteBuffer;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
-import org.apache.activemq.artemis.core.asyncio.BufferCallback;
-import org.apache.activemq.artemis.core.asyncio.impl.AsynchronousFileImpl;
-import org.apache.activemq.artemis.core.journal.IOCriticalErrorListener;
-import org.apache.activemq.artemis.core.journal.SequentialFile;
-import org.apache.activemq.artemis.core.libaio.Native;
-import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
-import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
-
-public final class AIOSequentialFileFactory extends AbstractSequentialFileFactory
-{
-   private static final boolean trace = ActiveMQJournalLogger.LOGGER.isTraceEnabled();
-
-   private final ReuseBuffersController buffersControl = new ReuseBuffersController();
-
-   private ExecutorService pollerExecutor;
-
-   // This method exists just to make debug easier.
-   // I could replace log.trace by log.info temporarily while I was debugging
-   // Journal
-   private static void trace(final String message)
-   {
-      ActiveMQJournalLogger.LOGGER.trace(message);
-   }
-
-   public AIOSequentialFileFactory(final File journalDir)
-   {
-      this(journalDir,
-           JournalConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO,
-           JournalConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO,
-           false,
-           null);
-   }
-
-   public AIOSequentialFileFactory(final File journalDir, final IOCriticalErrorListener listener)
-   {
-      this(journalDir,
-           JournalConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO,
-           JournalConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO,
-           false,
-           listener);
-   }
-
-   public AIOSequentialFileFactory(final File journalDir,
-                                   final int bufferSize,
-                                   final int bufferTimeout,
-                                   final boolean logRates)
-   {
-      this(journalDir, bufferSize, bufferTimeout, logRates, null);
-   }
-
-   public AIOSequentialFileFactory(final File journalDir,
-                                   final int bufferSize,
-                                   final int bufferTimeout,
-                                   final boolean logRates,
-                                   final IOCriticalErrorListener listener)
-   {
-      super(journalDir, true, bufferSize, bufferTimeout, logRates, listener);
-   }
-
-   public SequentialFile createSequentialFile(final String fileName, final int maxIO)
-   {
-      return new AIOSequentialFile(this,
-                                   bufferSize,
-                                   bufferTimeout,
-                                   journalDir,
-                                   fileName,
-                                   maxIO,
-                                   buffersControl.callback,
-                                   writeExecutor,
-                                   pollerExecutor);
-   }
-
-   public boolean isSupportsCallbacks()
-   {
-      return true;
-   }
-
-   public static boolean isSupported()
-   {
-      return AsynchronousFileImpl.isLoaded();
-   }
-
-   public ByteBuffer allocateDirectBuffer(final int size)
-   {
-
-      int blocks = size / 512;
-      if (size % 512 != 0)
-      {
-         blocks++;
-      }
-
-      // The buffer on AIO has to be a multiple of 512
-      ByteBuffer buffer = AsynchronousFileImpl.newBuffer(blocks * 512);
-
-      buffer.limit(size);
-
-      return buffer;
-   }
-
-   public void releaseDirectBuffer(final ByteBuffer buffer)
-   {
-      Native.destroyBuffer(buffer);
-   }
-
-   public ByteBuffer newBuffer(int size)
-   {
-      if (size % 512 != 0)
-      {
-         size = (size / 512 + 1) * 512;
-      }
-
-      return buffersControl.newBuffer(size);
-   }
-
-   public void clearBuffer(final ByteBuffer directByteBuffer)
-   {
-      AsynchronousFileImpl.clearBuffer(directByteBuffer);
-   }
-
-   public int getAlignment()
-   {
-      return 512;
-   }
-
-   // For tests only
-   public ByteBuffer wrapBuffer(final byte[] bytes)
-   {
-      ByteBuffer newbuffer = newBuffer(bytes.length);
-      newbuffer.put(bytes);
-      return newbuffer;
-   }
-
-   public int calculateBlockSize(final int position)
-   {
-      int alignment = getAlignment();
-
-      int pos = (position / alignment + (position % alignment != 0 ? 1 : 0)) * alignment;
-
-      return pos;
-   }
-
-   /* (non-Javadoc)
-    * @see org.apache.activemq.artemis.core.journal.SequentialFileFactory#releaseBuffer(java.nio.ByteBuffer)
-    */
-   @Override
-   public synchronized void releaseBuffer(final ByteBuffer buffer)
-   {
-      Native.destroyBuffer(buffer);
-   }
-
-   @Override
-   public void start()
-   {
-      super.start();
-
-      pollerExecutor = Executors.newCachedThreadPool(new ActiveMQThreadFactory("ActiveMQ-AIO-poller-pool" + System.identityHashCode(this),
-                                                                              true,
-                                                                              AIOSequentialFileFactory.getThisClassLoader()));
-
-   }
-
-   @Override
-   public void stop()
-   {
-      buffersControl.stop();
-
-      if (pollerExecutor != null)
-      {
-         pollerExecutor.shutdown();
-
-         try
-         {
-            if (!pollerExecutor.awaitTermination(AbstractSequentialFileFactory.EXECUTOR_TIMEOUT, TimeUnit.SECONDS))
-            {
-               ActiveMQJournalLogger.LOGGER.timeoutOnPollerShutdown(new Exception("trace"));
-            }
-         }
-         catch (InterruptedException e)
-         {
-            throw new ActiveMQInterruptedException(e);
-         }
-      }
-
-      super.stop();
-   }
-
-   @Override
-   protected void finalize()
-   {
-      stop();
-   }
-
-   /**
-    * Class that will control buffer-reuse
-    */
-   private class ReuseBuffersController
-   {
-      private volatile long bufferReuseLastTime = System.currentTimeMillis();
-
-      /**
-       * This queue is fed by {@link org.apache.activemq.artemis.core.journal.impl.AIOSequentialFileFactory.ReuseBuffersController.LocalBufferCallback}
-       * which is called directly by NIO or NIO. On the case of the AIO this is almost called by the native layer as
-       * soon as the buffer is not being used any more and ready to be reused or GCed
-       */
-      private final ConcurrentLinkedQueue<ByteBuffer> reuseBuffersQueue = new ConcurrentLinkedQueue<ByteBuffer>();
-
-      private boolean stopped = false;
-
-      final BufferCallback callback = new LocalBufferCallback();
-
-      public ByteBuffer newBuffer(final int size)
-      {
-         // if a new buffer wasn't requested in 10 seconds, we clear the queue
-         // This is being done this way as we don't need another Timeout Thread
-         // just to cleanup this
-         if (bufferSize > 0 && System.currentTimeMillis() - bufferReuseLastTime > 10000)
-         {
-            if (AIOSequentialFileFactory.trace)
-            {
-               AIOSequentialFileFactory.trace("Clearing reuse buffers queue with " + reuseBuffersQueue.size() +
-                                                 " elements");
-            }
-
-            bufferReuseLastTime = System.currentTimeMillis();
-
-            clearPoll();
-         }
-
-         // if a buffer is bigger than the configured-bufferSize, we just create a new
-         // buffer.
-         if (size > bufferSize)
-         {
-            return AsynchronousFileImpl.newBuffer(size);
-         }
-         else
-         {
-            // We need to allocate buffers following the rules of the storage
-            // being used (AIO/NIO)
-            int alignedSize = calculateBlockSize(size);
-
-            // Try getting a buffer from the queue...
-            ByteBuffer buffer = reuseBuffersQueue.poll();
-
-            if (buffer == null)
-            {
-               // if empty create a new one.
-               buffer = AsynchronousFileImpl.newBuffer(bufferSize);
-
-               buffer.limit(alignedSize);
-            }
-            else
-            {
-               clearBuffer(buffer);
-
-               // set the limit of the buffer to the bufferSize being required
-               buffer.limit(alignedSize);
-            }
-
-            buffer.rewind();
-
-            return buffer;
-         }
-      }
-
-      public synchronized void stop()
-      {
-         stopped = true;
-         clearPoll();
-      }
-
-      public synchronized void clearPoll()
-      {
-         ByteBuffer reusedBuffer;
-
-         while ((reusedBuffer = reuseBuffersQueue.poll()) != null)
-         {
-            releaseBuffer(reusedBuffer);
-         }
-      }
-
-      private class LocalBufferCallback implements BufferCallback
-      {
-         public void bufferDone(final ByteBuffer buffer)
-         {
-            synchronized (ReuseBuffersController.this)
-            {
-
-               if (stopped)
-               {
-                  releaseBuffer(buffer);
-               }
-               else
-               {
-                  bufferReuseLastTime = System.currentTimeMillis();
-
-                  // If a buffer has any other than the configured bufferSize, the buffer
-                  // will be just sent to GC
-                  if (buffer.capacity() == bufferSize)
-                  {
-                     reuseBuffersQueue.offer(buffer);
-                  }
-                  else
-                  {
-                     releaseBuffer(buffer);
-                  }
-               }
-            }
-         }
-      }
-   }
-
-   private static ClassLoader getThisClassLoader()
-   {
-      return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
-      {
-         public ClassLoader run()
-         {
-            return AIOSequentialFileFactory.class.getClassLoader();
-         }
-      });
-
-   }
-
-   @Override
-   public String toString()
-   {
-      return AIOSequentialFileFactory.class.getSimpleName() + "(buffersControl.stopped=" + buffersControl.stopped +
-         "):" + super.toString();
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
index e21b046..b36a0c4 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractJournalUpdateTask.java
@@ -24,8 +24,8 @@ import java.util.Set;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.Pair;
-import org.apache.activemq.artemis.core.journal.SequentialFile;
-import org.apache.activemq.artemis.core.journal.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
 import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
 import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecord;
 import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalInternalRecord;
@@ -87,7 +87,7 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
                                                  final List<Pair<String, String>> renames) throws Exception
    {
 
-      SequentialFile controlFile = fileFactory.createSequentialFile(AbstractJournalUpdateTask.FILE_COMPACT_CONTROL, 1);
+      SequentialFile controlFile = fileFactory.createSequentialFile(AbstractJournalUpdateTask.FILE_COMPACT_CONTROL);
 
       try
       {
@@ -182,7 +182,7 @@ public abstract class AbstractJournalUpdateTask implements JournalReaderCallback
          // To Fix the size of the file
          writingChannel.writerIndex(writingChannel.capacity());
 
-         sequentialFile.writeInternal(writingChannel.toByteBuffer());
+         sequentialFile.writeDirect(writingChannel.toByteBuffer(), true);
          sequentialFile.close();
          newDataFiles.add(currentFile);
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractSequentialFile.java
deleted file mode 100644
index a4eed58..0000000
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractSequentialFile.java
+++ /dev/null
@@ -1,407 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.journal.impl;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
-import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
-import org.apache.activemq.artemis.core.journal.EncodingSupport;
-import org.apache.activemq.artemis.core.journal.IOAsyncTask;
-import org.apache.activemq.artemis.core.journal.SequentialFile;
-import org.apache.activemq.artemis.core.journal.SequentialFileFactory;
-import org.apache.activemq.artemis.journal.ActiveMQJournalBundle;
-import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
-
-public abstract class AbstractSequentialFile implements SequentialFile
-{
-
-   private File file;
-
-   protected final File directory;
-
-   protected final SequentialFileFactory factory;
-
-   protected long fileSize = 0;
-
-   protected final AtomicLong position = new AtomicLong(0);
-
-   protected TimedBuffer timedBuffer;
-
-   /**
-    * Instead of having AIOSequentialFile implementing the Observer, I have done it on an inner class.
-    * This is the class returned to the factory when the file is being activated.
-    */
-   protected final TimedBufferObserver timedBufferObserver = new LocalBufferObserver();
-
-   /**
-    * Used for asynchronous writes
-    */
-   protected final Executor writerExecutor;
-
-   /**
-    * @param file
-    * @param directory
-    */
-   public AbstractSequentialFile(final File directory,
-                                 final String file,
-                                 final SequentialFileFactory factory,
-                                 final Executor writerExecutor)
-   {
-      super();
-      this.file = new File(directory, file);
-      this.directory = directory;
-      this.factory = factory;
-      this.writerExecutor = writerExecutor;
-   }
-
-   // Public --------------------------------------------------------
-
-   public final boolean exists()
-   {
-      return file.exists();
-   }
-
-   public final String getFileName()
-   {
-      return file.getName();
-   }
-
-   public final void delete() throws IOException, InterruptedException, ActiveMQException
-   {
-      if (isOpen())
-      {
-         close();
-      }
-
-      if (file.exists() && !file.delete())
-      {
-         ActiveMQJournalLogger.LOGGER.errorDeletingFile(this);
-      }
-   }
-
-   public void copyTo(SequentialFile newFileName) throws Exception
-   {
-      try
-      {
-         ActiveMQJournalLogger.LOGGER.debug("Copying " + this + " as " + newFileName);
-         if (!newFileName.isOpen())
-         {
-            newFileName.open();
-         }
-
-         if (!isOpen())
-         {
-            this.open();
-         }
-
-
-         ByteBuffer buffer = ByteBuffer.allocate(10 * 1024);
-
-         for (;;)
-         {
-            buffer.rewind();
-            int size = this.read(buffer);
-            newFileName.writeDirect(buffer, false);
-            if (size < 10 * 1024)
-            {
-               break;
-            }
-         }
-         newFileName.close();
-         this.close();
-      }
-      catch (IOException e)
-      {
-         factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
-         throw e;
-      }
-   }
-
-   /**
-    * @throws IOException only declare exception due to signature. Sub-class needs it.
-    */
-   @Override
-   public void position(final long pos) throws IOException
-   {
-      position.set(pos);
-   }
-
-   public long position()
-   {
-      return position.get();
-   }
-
-   public final void renameTo(final String newFileName) throws IOException, InterruptedException,
-      ActiveMQException
-   {
-      try
-      {
-         close();
-      }
-      catch (IOException e)
-      {
-         factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
-         throw e;
-      }
-
-      File newFile = new File(directory + "/" + newFileName);
-
-      if (!file.equals(newFile))
-      {
-         if (!file.renameTo(newFile))
-         {
-            throw ActiveMQJournalBundle.BUNDLE.ioRenameFileError(file.getName(), newFileName);
-         }
-         file = newFile;
-      }
-   }
-
-   /**
-    * @throws IOException      we declare throwing IOException because sub-classes need to do it
-    * @throws ActiveMQException
-    */
-   public synchronized void close() throws IOException, InterruptedException, ActiveMQException
-   {
-      final CountDownLatch donelatch = new CountDownLatch(1);
-
-      if (writerExecutor != null)
-      {
-         writerExecutor.execute(new Runnable()
-         {
-            public void run()
-            {
-               donelatch.countDown();
-            }
-         });
-
-         while (!donelatch.await(60, TimeUnit.SECONDS))
-         {
-            ActiveMQJournalLogger.LOGGER.couldNotCompleteTask(new Exception("trace"), file.getName());
-         }
-      }
-   }
-
-   public final boolean fits(final int size)
-   {
-      if (timedBuffer == null)
-      {
-         return position.get() + size <= fileSize;
-      }
-      else
-      {
-         return timedBuffer.checkSize(size);
-      }
-   }
-
-   public void setTimedBuffer(final TimedBuffer buffer)
-   {
-      if (timedBuffer != null)
-      {
-         timedBuffer.setObserver(null);
-      }
-
-      timedBuffer = buffer;
-
-      if (buffer != null)
-      {
-         buffer.setObserver(timedBufferObserver);
-      }
-
-   }
-
-   public void write(final ActiveMQBuffer bytes, final boolean sync, final IOAsyncTask callback) throws IOException
-   {
-      if (timedBuffer != null)
-      {
-         bytes.setIndex(0, bytes.capacity());
-         timedBuffer.addBytes(bytes, sync, callback);
-      }
-      else
-      {
-         ByteBuffer buffer = factory.newBuffer(bytes.capacity());
-         buffer.put(bytes.toByteBuffer().array());
-         buffer.rewind();
-         writeDirect(buffer, sync, callback);
-      }
-   }
-
-   public void write(final ActiveMQBuffer bytes, final boolean sync) throws IOException, InterruptedException,
-      ActiveMQException
-   {
-      if (sync)
-      {
-         SimpleWaitIOCallback completion = new SimpleWaitIOCallback();
-
-         write(bytes, true, completion);
-
-         completion.waitCompletion();
-      }
-      else
-      {
-         write(bytes, false, DummyCallback.getInstance());
-      }
-   }
-
-   public void write(final EncodingSupport bytes, final boolean sync, final IOAsyncTask callback)
-   {
-      if (timedBuffer != null)
-      {
-         timedBuffer.addBytes(bytes, sync, callback);
-      }
-      else
-      {
-         ByteBuffer buffer = factory.newBuffer(bytes.getEncodeSize());
-
-         // If not using the TimedBuffer, a final copy is necessary
-         // Because AIO will need a specific Buffer
-         // And NIO will also need a whole buffer to perform the write
-
-         ActiveMQBuffer outBuffer = ActiveMQBuffers.wrappedBuffer(buffer);
-         bytes.encode(outBuffer);
-         buffer.rewind();
-         writeDirect(buffer, sync, callback);
-      }
-   }
-
-   public void write(final EncodingSupport bytes, final boolean sync) throws InterruptedException, ActiveMQException
-   {
-      if (sync)
-      {
-         SimpleWaitIOCallback completion = new SimpleWaitIOCallback();
-
-         write(bytes, true, completion);
-
-         completion.waitCompletion();
-      }
-      else
-      {
-         write(bytes, false, DummyCallback.getInstance());
-      }
-   }
-
-   protected File getFile()
-   {
-      return file;
-   }
-
-   private static final class DelegateCallback implements IOAsyncTask
-   {
-      final List<IOAsyncTask> delegates;
-
-      private DelegateCallback(final List<IOAsyncTask> delegates)
-      {
-         this.delegates = delegates;
-      }
-
-      public void done()
-      {
-         for (IOAsyncTask callback : delegates)
-         {
-            try
-            {
-               callback.done();
-            }
-            catch (Throwable e)
-            {
-               ActiveMQJournalLogger.LOGGER.errorCompletingCallback(e);
-            }
-         }
-      }
-
-      public void onError(final int errorCode, final String errorMessage)
-      {
-         for (IOAsyncTask callback : delegates)
-         {
-            try
-            {
-               callback.onError(errorCode, errorMessage);
-            }
-            catch (Throwable e)
-            {
-               ActiveMQJournalLogger.LOGGER.errorCallingErrorCallback(e);
-            }
-         }
-      }
-   }
-
-   protected ByteBuffer newBuffer(int size, int limit)
-   {
-      size = factory.calculateBlockSize(size);
-      limit = factory.calculateBlockSize(limit);
-
-      ByteBuffer buffer = factory.newBuffer(size);
-      buffer.limit(limit);
-      return buffer;
-   }
-
-   protected class LocalBufferObserver implements TimedBufferObserver
-   {
-      public void flushBuffer(final ByteBuffer buffer, final boolean requestedSync, final List<IOAsyncTask> callbacks)
-      {
-         buffer.flip();
-
-         if (buffer.limit() == 0)
-         {
-            factory.releaseBuffer(buffer);
-         }
-         else
-         {
-            writeDirect(buffer, requestedSync, new DelegateCallback(callbacks));
-         }
-      }
-
-      public ByteBuffer newBuffer(final int size, final int limit)
-      {
-         return AbstractSequentialFile.this.newBuffer(size, limit);
-      }
-
-      public int getRemainingBytes()
-      {
-         if (fileSize - position.get() > Integer.MAX_VALUE)
-         {
-            return Integer.MAX_VALUE;
-         }
-         else
-         {
-            return (int)(fileSize - position.get());
-         }
-      }
-
-      @Override
-      public String toString()
-      {
-         return "TimedBufferObserver on file (" + getFile().getName() + ")";
-      }
-
-   }
-
-   @Override
-   public File getJavaFile()
-   {
-      return getFile().getAbsoluteFile();
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractSequentialFileFactory.java
deleted file mode 100644
index ec0ab4d..0000000
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/AbstractSequentialFileFactory.java
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.journal.impl;
-
-import java.io.File;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.activemq.artemis.api.core.ActiveMQInterruptedException;
-import org.apache.activemq.artemis.core.journal.IOCriticalErrorListener;
-import org.apache.activemq.artemis.core.journal.SequentialFile;
-import org.apache.activemq.artemis.core.journal.SequentialFileFactory;
-import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
-import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
-
-/**
- * An abstract SequentialFileFactory containing basic functionality for both AIO and NIO SequentialFactories
- */
-abstract class AbstractSequentialFileFactory implements SequentialFileFactory
-{
-
-   // Timeout used to wait executors to shutdown
-   protected static final int EXECUTOR_TIMEOUT = 60;
-
-   protected final File journalDir;
-
-   protected final TimedBuffer timedBuffer;
-
-   protected final int bufferSize;
-
-   protected final long bufferTimeout;
-
-   private final IOCriticalErrorListener critialErrorListener;
-
-   /**
-    * Asynchronous writes need to be done at another executor.
-    * This needs to be done at NIO, or else we would have the callers thread blocking for the return.
-    * At AIO this is necessary as context switches on writes would fire flushes at the kernel.
-    *  */
-   protected ExecutorService writeExecutor;
-
-   AbstractSequentialFileFactory(final File journalDir,
-                                        final boolean buffered,
-                                        final int bufferSize,
-                                        final int bufferTimeout,
-                                        final boolean logRates,
-                                        final IOCriticalErrorListener criticalErrorListener)
-   {
-      this.journalDir = journalDir;
-
-      if (buffered)
-      {
-         timedBuffer = new TimedBuffer(bufferSize, bufferTimeout, logRates);
-      }
-      else
-      {
-         timedBuffer = null;
-      }
-      this.bufferSize = bufferSize;
-      this.bufferTimeout = bufferTimeout;
-      this.critialErrorListener = criticalErrorListener;
-   }
-
-   public void stop()
-   {
-      if (timedBuffer != null)
-      {
-         timedBuffer.stop();
-      }
-
-      if (isSupportsCallbacks() && writeExecutor != null)
-      {
-         writeExecutor.shutdown();
-
-         try
-         {
-            if (!writeExecutor.awaitTermination(AbstractSequentialFileFactory.EXECUTOR_TIMEOUT, TimeUnit.SECONDS))
-            {
-               ActiveMQJournalLogger.LOGGER.timeoutOnWriterShutdown(new Exception("trace"));
-            }
-         }
-         catch (InterruptedException e)
-         {
-            throw new ActiveMQInterruptedException(e);
-         }
-      }
-   }
-
-   @Override
-   public File getDirectory()
-   {
-      return journalDir;
-   }
-
-   public void start()
-   {
-      if (timedBuffer != null)
-      {
-         timedBuffer.start();
-      }
-
-      if (isSupportsCallbacks())
-      {
-         writeExecutor = Executors.newSingleThreadExecutor(new ActiveMQThreadFactory("ActiveMQ-Asynchronous-Persistent-Writes" + System.identityHashCode(this),
-                                                                                    true,
-                                                                                    AbstractSequentialFileFactory.getThisClassLoader()));
-      }
-
-   }
-
-   @Override
-   public void onIOError(Exception exception, String message, SequentialFile file)
-   {
-      if (critialErrorListener != null)
-      {
-         critialErrorListener.onIOException(exception, message, file);
-      }
-   }
-
-   @Override
-   public void activateBuffer(final SequentialFile file)
-   {
-      if (timedBuffer != null)
-      {
-         file.setTimedBuffer(timedBuffer);
-      }
-   }
-
-   public void flush()
-   {
-      if (timedBuffer != null)
-      {
-         timedBuffer.flush();
-      }
-   }
-
-   public void deactivateBuffer()
-   {
-      if (timedBuffer != null)
-      {
-         // When moving to a new file, we need to make sure any pending buffer will be transferred to the buffer
-         timedBuffer.flush();
-         timedBuffer.setObserver(null);
-      }
-   }
-
-   public void releaseBuffer(final ByteBuffer buffer)
-   {
-   }
-
-   /**
-    * Create the directory if it doesn't exist yet
-    */
-   public void createDirs() throws Exception
-   {
-      boolean ok = journalDir.mkdirs();
-      if (!ok)
-      {
-         throw new IOException("Failed to create directory " + journalDir);
-      }
-   }
-
-   public List<String> listFiles(final String extension) throws Exception
-   {
-      FilenameFilter fnf = new FilenameFilter()
-      {
-         public boolean accept(final File file, final String name)
-         {
-            return name.endsWith("." + extension);
-         }
-      };
-
-      String[] fileNames = journalDir.list(fnf);
-
-      if (fileNames == null)
-      {
-         return Collections.EMPTY_LIST;
-      }
-
-      return Arrays.asList(fileNames);
-   }
-
-   private static ClassLoader getThisClassLoader()
-   {
-      return AccessController.doPrivileged(new PrivilegedAction<ClassLoader>()
-      {
-         public ClassLoader run()
-         {
-            return AbstractSequentialFileFactory.class.getClassLoader();
-         }
-      });
-
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/DummyCallback.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/DummyCallback.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/DummyCallback.java
deleted file mode 100644
index 9c4c3d6..0000000
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/DummyCallback.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.journal.impl;
-
-import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
-
-class DummyCallback extends SyncIOCompletion
-{
-   private static final DummyCallback instance = new DummyCallback();
-
-   public static DummyCallback getInstance()
-   {
-      return DummyCallback.instance;
-   }
-
-   public void done()
-   {
-   }
-
-   public void onError(final int errorCode, final String errorMessage)
-   {
-      ActiveMQJournalLogger.LOGGER.errorWritingData(new Exception(errorMessage), errorMessage, errorCode);
-   }
-
-   @Override
-   public void waitCompletion() throws Exception
-   {
-   }
-
-   @Override
-   public void storeLineUp()
-   {
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
index a41e72f..5a0f11f 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/FileWrapperJournal.java
@@ -32,7 +32,7 @@ import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
 import org.apache.activemq.artemis.core.journal.LoaderCallback;
 import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
 import org.apache.activemq.artemis.core.journal.RecordInfo;
-import org.apache.activemq.artemis.core.journal.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
 import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
 import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecord;
 import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecordTX;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java
index d6a856a..1ba8f0b 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalBase.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.core.journal.impl;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.core.io.DummyCallback;
 import org.apache.activemq.artemis.core.journal.EncodingSupport;
 import org.apache.activemq.artemis.core.journal.IOCompletion;
 import org.apache.activemq.artemis.core.journal.Journal;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
index 5b0a1b8..1f657a2 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalCompactor.java
@@ -28,8 +28,8 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.core.journal.RecordInfo;
-import org.apache.activemq.artemis.core.journal.SequentialFile;
-import org.apache.activemq.artemis.core.journal.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
 import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
 import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecord;
 import org.apache.activemq.artemis.core.journal.impl.dataformat.JournalAddRecordTX;
@@ -64,7 +64,7 @@ public class JournalCompactor extends AbstractJournalUpdateTask implements Journ
                                                 final List<String> newFiles,
                                                 final List<Pair<String, String>> renameFile) throws Exception
    {
-      SequentialFile controlFile = fileFactory.createSequentialFile(AbstractJournalUpdateTask.FILE_COMPACT_CONTROL, 1);
+      SequentialFile controlFile = fileFactory.createSequentialFile(AbstractJournalUpdateTask.FILE_COMPACT_CONTROL);
 
       if (controlFile.exists())
       {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFile.java
index e3b1624..dcfc1a2 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFile.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFile.java
@@ -16,7 +16,7 @@
  */
 package org.apache.activemq.artemis.core.journal.impl;
 
-import org.apache.activemq.artemis.core.journal.SequentialFile;
+import org.apache.activemq.artemis.core.io.SequentialFile;
 
 public interface JournalFile
 {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFileImpl.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFileImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFileImpl.java
index 4438a96..7e96575 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFileImpl.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFileImpl.java
@@ -21,7 +21,7 @@ import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.activemq.artemis.core.journal.SequentialFile;
+import org.apache.activemq.artemis.core.io.SequentialFile;
 
 public class JournalFileImpl implements JournalFile
 {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java
index 052dc25..268a23d 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalFilesRepository.java
@@ -31,8 +31,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.activemq.artemis.core.journal.SequentialFile;
-import org.apache.activemq.artemis.core.journal.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
 import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
 
 /**
@@ -662,13 +662,13 @@ public class JournalFilesRepository
 
       String tmpFileName = fileName + ".tmp";
 
-      SequentialFile sequentialFile = fileFactory.createSequentialFile(tmpFileName, maxAIO);
+      SequentialFile sequentialFile = fileFactory.createSequentialFile(tmpFileName);
 
       sequentialFile.open(1, false);
 
       if (init)
       {
-         sequentialFile.fill(0, fileSize, JournalImpl.FILL_CHARACTER);
+         sequentialFile.fill(fileSize);
 
          JournalImpl.initFileHeader(fileFactory, sequentialFile, userVersion, fileID);
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
index 6f411eb..068e697 100644
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
+++ b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/JournalImpl.java
@@ -46,15 +46,15 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
 import org.apache.activemq.artemis.api.core.Pair;
+import org.apache.activemq.artemis.core.io.IOCallback;
 import org.apache.activemq.artemis.core.journal.EncodingSupport;
-import org.apache.activemq.artemis.core.journal.IOAsyncTask;
 import org.apache.activemq.artemis.core.journal.IOCompletion;
 import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
 import org.apache.activemq.artemis.core.journal.LoaderCallback;
 import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
 import org.apache.activemq.artemis.core.journal.RecordInfo;
-import org.apache.activemq.artemis.core.journal.SequentialFile;
-import org.apache.activemq.artemis.core.journal.SequentialFileFactory;
+import org.apache.activemq.artemis.core.io.SequentialFile;
+import org.apache.activemq.artemis.core.io.SequentialFileFactory;
 import org.apache.activemq.artemis.core.journal.TestableJournal;
 import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
 import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
@@ -293,7 +293,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
 
       final CountDownLatch latch = new CountDownLatch(numIts * 2);
 
-      class MyIOAsyncTask implements IOCompletion
+      class MyAIOCallback implements IOCompletion
       {
          public void done()
          {
@@ -310,7 +310,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
          }
       }
 
-      final MyIOAsyncTask task = new MyIOAsyncTask();
+      final MyAIOCallback task = new MyAIOCallback();
 
       final int recordSize = 1024;
 
@@ -373,11 +373,11 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
 
       for (String fileName : fileNames)
       {
-         SequentialFile file = fileFactory.createSequentialFile(fileName, filesRepository.getMaxAIO());
+         SequentialFile file = fileFactory.createSequentialFile(fileName);
 
          if (file.size() >= SIZE_HEADER)
          {
-            file.open(1, false);
+            file.open();
 
             try
             {
@@ -2776,11 +2776,11 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
                                     final boolean completeTransaction,
                                     final boolean sync,
                                     final JournalTransaction tx,
-                                    final IOAsyncTask parameterCallback) throws Exception
+                                    final IOCallback parameterCallback) throws Exception
    {
       checkJournalIsLoaded();
 
-      final IOAsyncTask callback;
+      final IOCallback callback;
 
       final int size = encoder.getEncodeSize();
 
@@ -2896,7 +2896,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
       {
          for (String dataFile : dataFiles)
          {
-            SequentialFile file = fileFactory.createSequentialFile(dataFile, 1);
+            SequentialFile file = fileFactory.createSequentialFile(dataFile);
             if (file.exists())
             {
                file.delete();
@@ -2905,7 +2905,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
 
          for (String newFile : newFiles)
          {
-            SequentialFile file = fileFactory.createSequentialFile(newFile, 1);
+            SequentialFile file = fileFactory.createSequentialFile(newFile);
             if (file.exists())
             {
                final String originalName = file.getFileName();
@@ -2916,8 +2916,8 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
 
          for (Pair<String, String> rename : renames)
          {
-            SequentialFile fileTmp = fileFactory.createSequentialFile(rename.getA(), 1);
-            SequentialFile fileTo = fileFactory.createSequentialFile(rename.getB(), 1);
+            SequentialFile fileTmp = fileFactory.createSequentialFile(rename.getA());
+            SequentialFile fileTo = fileFactory.createSequentialFile(rename.getB());
             // We should do the rename only if the tmp file still exist, or else we could
             // delete a valid file depending on where the crash occurred during the control file delete
             if (fileTmp.exists())
@@ -2951,7 +2951,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
          for (String fileToDelete : leftFiles)
          {
             ActiveMQJournalLogger.LOGGER.deletingOrphanedFile(fileToDelete);
-            SequentialFile file = fileFactory.createSequentialFile(fileToDelete, 1);
+            SequentialFile file = fileFactory.createSequentialFile(fileToDelete);
             file.delete();
          }
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/NIOSequentialFile.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/NIOSequentialFile.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/NIOSequentialFile.java
deleted file mode 100644
index 0dac7f2..0000000
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/NIOSequentialFile.java
+++ /dev/null
@@ -1,404 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.journal.impl;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.channels.FileChannel;
-import java.util.concurrent.Executor;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.activemq.artemis.api.core.ActiveMQException;
-import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
-import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
-import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
-import org.apache.activemq.artemis.core.journal.IOAsyncTask;
-import org.apache.activemq.artemis.core.journal.SequentialFile;
-import org.apache.activemq.artemis.core.journal.SequentialFileFactory;
-import org.apache.activemq.artemis.journal.ActiveMQJournalBundle;
-import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
-
-public final class NIOSequentialFile extends AbstractSequentialFile
-{
-   private FileChannel channel;
-
-   private RandomAccessFile rfile;
-
-   /**
-    * The write semaphore here is only used when writing asynchronously
-    */
-   private Semaphore maxIOSemaphore;
-
-   private final int defaultMaxIO;
-
-   private int maxIO;
-
-   public NIOSequentialFile(final SequentialFileFactory factory,
-                            final File directory,
-                            final String file,
-                            final int maxIO,
-                            final Executor writerExecutor)
-   {
-      super(directory, file, factory, writerExecutor);
-      defaultMaxIO = maxIO;
-   }
-
-   public int getAlignment()
-   {
-      return 1;
-   }
-
-   public int calculateBlockStart(final int position)
-   {
-      return position;
-   }
-
-   public synchronized boolean isOpen()
-   {
-      return channel != null;
-   }
-
-   /**
-    * this.maxIO represents the default maxIO.
-    * Some operations while initializing files on the journal may require a different maxIO
-    */
-   public synchronized void open() throws IOException
-   {
-      open(defaultMaxIO, true);
-   }
-
-   public void open(final int maxIO, final boolean useExecutor) throws IOException
-   {
-      try
-      {
-         rfile = new RandomAccessFile(getFile(), "rw");
-
-         channel = rfile.getChannel();
-
-         fileSize = channel.size();
-      }
-      catch (IOException e)
-      {
-         factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
-         throw e;
-      }
-
-      if (writerExecutor != null && useExecutor)
-      {
-         maxIOSemaphore = new Semaphore(maxIO);
-         this.maxIO = maxIO;
-      }
-   }
-
-   public void fill(final int position, final int size, final byte fillCharacter) throws IOException
-   {
-      ByteBuffer bb = ByteBuffer.allocate(size);
-
-      for (int i = 0; i < size; i++)
-      {
-         bb.put(fillCharacter);
-      }
-
-      bb.flip();
-
-      try
-      {
-         channel.position(position);
-         channel.write(bb);
-         channel.force(false);
-         channel.position(0);
-      }
-      catch (IOException e)
-      {
-         factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
-         throw e;
-      }
-
-      fileSize = channel.size();
-   }
-
-   public synchronized void waitForClose() throws InterruptedException
-   {
-      while (isOpen())
-      {
-         wait();
-      }
-   }
-
-   @Override
-   public synchronized void close() throws IOException, InterruptedException, ActiveMQException
-   {
-      super.close();
-
-      if (maxIOSemaphore != null)
-      {
-         while (!maxIOSemaphore.tryAcquire(maxIO, 60, TimeUnit.SECONDS))
-         {
-            ActiveMQJournalLogger.LOGGER.errorClosingFile(getFileName());
-         }
-      }
-
-      maxIOSemaphore = null;
-      try
-      {
-         if (channel != null)
-         {
-            channel.close();
-         }
-
-         if (rfile != null)
-         {
-            rfile.close();
-         }
-      }
-      catch (IOException e)
-      {
-         factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
-         throw e;
-      }
-      channel = null;
-
-      rfile = null;
-
-      notifyAll();
-   }
-
-   public int read(final ByteBuffer bytes) throws Exception
-   {
-      return read(bytes, null);
-   }
-
-   public synchronized int read(final ByteBuffer bytes, final IOAsyncTask callback) throws IOException,
-      ActiveMQIllegalStateException
-   {
-      try
-      {
-         if (channel == null)
-         {
-            throw new ActiveMQIllegalStateException("File " + this.getFileName() + " has a null channel");
-         }
-         int bytesRead = channel.read(bytes);
-
-         if (callback != null)
-         {
-            callback.done();
-         }
-
-         bytes.flip();
-
-         return bytesRead;
-      }
-      catch (IOException e)
-      {
-         if (callback != null)
-         {
-            callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getLocalizedMessage());
-         }
-
-         factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
-
-         throw e;
-      }
-   }
-
-   public void sync() throws IOException
-   {
-      if (channel != null)
-      {
-         try
-         {
-            channel.force(false);
-         }
-         catch (IOException e)
-         {
-            factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
-            throw e;
-         }
-      }
-   }
-
-   public long size() throws IOException
-   {
-      if (channel == null)
-      {
-         return getFile().length();
-      }
-
-      try
-      {
-         return channel.size();
-      }
-      catch (IOException e)
-      {
-         factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
-         throw e;
-      }
-   }
-
-   @Override
-   public void position(final long pos) throws IOException
-   {
-      try
-      {
-         super.position(pos);
-         channel.position(pos);
-      }
-      catch (IOException e)
-      {
-         factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
-         throw e;
-      }
-   }
-
-   @Override
-   public String toString()
-   {
-      return "NIOSequentialFile " + getFile();
-   }
-
-   public SequentialFile cloneFile()
-   {
-      return new NIOSequentialFile(factory, directory, getFileName(), maxIO, writerExecutor);
-   }
-
-   public void writeDirect(final ByteBuffer bytes, final boolean sync, final IOAsyncTask callback)
-   {
-      if (callback == null)
-      {
-         throw new NullPointerException("callback parameter need to be set");
-      }
-
-      try
-      {
-         internalWrite(bytes, sync, callback);
-      }
-      catch (Exception e)
-      {
-         callback.onError(ActiveMQExceptionType.GENERIC_EXCEPTION.getCode(), e.getMessage());
-      }
-   }
-
-   public void writeDirect(final ByteBuffer bytes, final boolean sync) throws Exception
-   {
-      internalWrite(bytes, sync, null);
-   }
-
-   public void writeInternal(final ByteBuffer bytes) throws Exception
-   {
-      internalWrite(bytes, true, null);
-   }
-
-   @Override
-   protected ByteBuffer newBuffer(int size, final int limit)
-   {
-      // For NIO, we don't need to allocate a buffer the entire size of the timed buffer, unlike AIO
-
-      size = limit;
-
-      return super.newBuffer(size, limit);
-   }
-
-   private void internalWrite(final ByteBuffer bytes, final boolean sync, final IOAsyncTask callback) throws IOException, ActiveMQIOErrorException, InterruptedException
-   {
-      if (!isOpen())
-      {
-         if (callback != null)
-         {
-            callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), "File not opened");
-         }
-         else
-         {
-            throw ActiveMQJournalBundle.BUNDLE.fileNotOpened();
-         }
-         return;
-      }
-
-      position.addAndGet(bytes.limit());
-
-      if (maxIOSemaphore == null || callback == null)
-      {
-         // if maxIOSemaphore == null, that means we are not using executors and the writes are synchronous
-         try
-         {
-            doInternalWrite(bytes, sync, callback);
-         }
-         catch (IOException e)
-         {
-            factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
-         }
-      }
-      else
-      {
-         // This is a flow control on writing, just like maxAIO on libaio
-         maxIOSemaphore.acquire();
-
-         writerExecutor.execute(new Runnable()
-         {
-            public void run()
-            {
-               try
-               {
-                  try
-                  {
-                     doInternalWrite(bytes, sync, callback);
-                  }
-                  catch (IOException e)
-                  {
-                     ActiveMQJournalLogger.LOGGER.errorSubmittingWrite(e);
-                     factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), NIOSequentialFile.this);
-                     callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage());
-                  }
-                  catch (Throwable e)
-                  {
-                     ActiveMQJournalLogger.LOGGER.errorSubmittingWrite(e);
-                     callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage());
-                  }
-               }
-               finally
-               {
-                  maxIOSemaphore.release();
-               }
-            }
-         });
-      }
-   }
-
-   /**
-    * @param bytes
-    * @param sync
-    * @param callback
-    * @throws IOException
-    * @throws Exception
-    */
-   private void doInternalWrite(final ByteBuffer bytes, final boolean sync, final IOAsyncTask callback) throws IOException
-   {
-      channel.write(bytes);
-
-      if (sync)
-      {
-         sync();
-      }
-
-      if (callback != null)
-      {
-         callback.done();
-      }
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6fe9e0eb/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/NIOSequentialFileFactory.java
----------------------------------------------------------------------
diff --git a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/NIOSequentialFileFactory.java b/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/NIOSequentialFileFactory.java
deleted file mode 100644
index e471928..0000000
--- a/artemis-journal/src/main/java/org/apache/activemq/artemis/core/journal/impl/NIOSequentialFileFactory.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.journal.impl;
-
-import java.io.File;
-import java.lang.ref.WeakReference;
-import java.nio.ByteBuffer;
-
-import org.apache.activemq.artemis.core.journal.IOCriticalErrorListener;
-import org.apache.activemq.artemis.core.journal.SequentialFile;
-
-public class NIOSequentialFileFactory extends AbstractSequentialFileFactory
-{
-   public NIOSequentialFileFactory(final File journalDir)
-   {
-      this(journalDir, null);
-   }
-
-   public NIOSequentialFileFactory(final File journalDir, final IOCriticalErrorListener listener)
-   {
-      this(journalDir,
-           false,
-           JournalConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO,
-           JournalConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO,
-           false,
-           listener);
-   }
-
-   public NIOSequentialFileFactory(final File journalDir, final boolean buffered)
-   {
-      this(journalDir, buffered, null);
-   }
-
-   public NIOSequentialFileFactory(final File journalDir,
-                                   final boolean buffered,
-                                   final IOCriticalErrorListener listener)
-   {
-      this(journalDir,
-           buffered,
-           JournalConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO,
-           JournalConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO,
-           false,
-           listener);
-   }
-
-   public NIOSequentialFileFactory(final File journalDir,
-                                   final boolean buffered,
-                                   final int bufferSize,
-                                   final int bufferTimeout,
-                                   final boolean logRates)
-   {
-      this(journalDir, buffered, bufferSize, bufferTimeout, logRates, null);
-   }
-
-   public NIOSequentialFileFactory(final File journalDir,
-                                   final boolean buffered,
-                                   final int bufferSize,
-                                   final int bufferTimeout,
-                                   final boolean logRates,
-                                   final IOCriticalErrorListener listener)
-   {
-      super(journalDir, buffered, bufferSize, bufferTimeout, logRates, listener);
-   }
-
-   public SequentialFile createSequentialFile(final String fileName, int maxIO)
-   {
-      if (maxIO < 1)
-      {
-         // A single threaded IO
-         maxIO = 1;
-      }
-
-      return new NIOSequentialFile(this, journalDir, fileName, maxIO, writeExecutor);
-   }
-
-   public boolean isSupportsCallbacks()
-   {
-      return timedBuffer != null;
-   }
-
-
-   public ByteBuffer allocateDirectBuffer(final int size)
-   {
-      // Using direct buffer, as described on https://jira.jboss.org/browse/HORNETQ-467
-      ByteBuffer buffer2 = null;
-      try
-      {
-         buffer2 = ByteBuffer.allocateDirect(size);
-      }
-      catch (OutOfMemoryError error)
-      {
-         // This is a workaround for the way the JDK will deal with native buffers.
-         // the main portion is outside of the VM heap
-         // and the JDK will not have any reference about it to take GC into account
-         // so we force a GC and try again.
-         WeakReference<Object> obj = new WeakReference<Object>(new Object());
-         try
-         {
-            long timeout = System.currentTimeMillis() + 5000;
-            while (System.currentTimeMillis() > timeout && obj.get() != null)
-            {
-               System.gc();
-               Thread.sleep(100);
-            }
-         }
-         catch (InterruptedException e)
-         {
-         }
-
-         buffer2 = ByteBuffer.allocateDirect(size);
-
-      }
-      return buffer2;
-   }
-
-   public void releaseDirectBuffer(ByteBuffer buffer)
-   {
-      // nothing we can do on this case. we can just have good faith on GC
-   }
-
-   public ByteBuffer newBuffer(final int size)
-   {
-      return ByteBuffer.allocate(size);
-   }
-
-   public void clearBuffer(final ByteBuffer buffer)
-   {
-      final int limit = buffer.limit();
-      buffer.rewind();
-
-      for (int i = 0; i < limit; i++)
-      {
-         buffer.put((byte)0);
-      }
-
-      buffer.rewind();
-   }
-
-   public ByteBuffer wrapBuffer(final byte[] bytes)
-   {
-      return ByteBuffer.wrap(bytes);
-   }
-
-   public int getAlignment()
-   {
-      return 1;
-   }
-
-   public int calculateBlockSize(final int bytes)
-   {
-      return bytes;
-   }
-
-}