You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2009/11/19 13:25:53 UTC

svn commit: r882126 - in /activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async: DataFileAppender.java NIODataFileAppender.java

Author: dejanb
Date: Thu Nov 19 12:25:52 2009
New Revision: 882126

URL: http://svn.apache.org/viewvc?rev=882126&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2042 - making amq store resilent to 'no space left'

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java?rev=882126&r1=882125&r2=882126&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java Thu Nov 19 12:25:52 2009
@@ -21,6 +21,7 @@
 import java.io.RandomAccessFile;
 import java.util.Map;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.util.DataByteArrayOutputStream;
@@ -48,7 +49,7 @@
     protected final CountDownLatch shutdownDone = new CountDownLatch(1);
     protected int maxWriteBatchSize = DEFAULT_MAX_BATCH_SIZE;
 
-    private boolean running;
+    protected boolean running;
     private Thread thread;
 
     public static class WriteKey {
@@ -82,6 +83,7 @@
         public final WriteCommand first;
         public final CountDownLatch latch = new CountDownLatch(1);
         public int size;
+        public AtomicReference<IOException> exception = new AtomicReference<IOException>();
 
         public WriteBatch(DataFile dataFile, WriteCommand write) throws IOException {
             this.dataFile = dataFile;
@@ -179,6 +181,10 @@
             } catch (InterruptedException e) {
                 throw new InterruptedIOException();
             }
+            IOException exception = batch.exception.get(); 
+            if (exception != null) {
+                throw exception;
+            }
         }
 
         return location;
@@ -216,10 +222,7 @@
             if (shutdown) {
                 throw new IOException("Async Writter Thread Shutdown");
             }
-            if (firstAsyncException != null) {
-                throw firstAsyncException;
-            }
-
+            
             if (!running) {
                 running = true;
                 thread = new Thread() {
@@ -231,6 +234,11 @@
                 thread.setDaemon(true);
                 thread.setName("ActiveMQ Data File Writer");
                 thread.start();
+                firstAsyncException = null;
+            }
+            
+            if (firstAsyncException != null) {
+                throw firstAsyncException;
             }
 
             if (nextWriteBatch == null) {
@@ -298,6 +306,7 @@
     protected void processQueue() {
         DataFile dataFile = null;
         RandomAccessFile file = null;
+        WriteBatch wb = null;
         try {
 
             DataByteArrayOutputStream buff = new DataByteArrayOutputStream(maxWriteBatchSize);
@@ -321,7 +330,7 @@
                     enqueueMutex.notify();
                 }
 
-                WriteBatch wb = (WriteBatch)o;
+                wb = (WriteBatch)o;
                 if (dataFile != wb.dataFile) {
                     if (file != null) {
                         dataFile.closeRandomAccessFile(file);
@@ -406,6 +415,14 @@
         } catch (IOException e) {
             synchronized (enqueueMutex) {
                 firstAsyncException = e;
+                if (wb != null) {
+                    wb.latch.countDown();
+                    wb.exception.set(e);
+                }
+                if (nextWriteBatch != null) {
+                    nextWriteBatch.latch.countDown();
+                    nextWriteBatch.exception.set(e);
+                }
             }
         } catch (InterruptedException e) {
         } finally {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java?rev=882126&r1=882125&r2=882126&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java Thu Nov 19 12:25:52 2009
@@ -47,6 +47,7 @@
         DataFile dataFile = null;
         RandomAccessFile file = null;
         FileChannel channel = null;
+        WriteBatch wb = null;
 
         try {
 
@@ -81,7 +82,7 @@
                     enqueueMutex.notify();
                 }
 
-                WriteBatch wb = (WriteBatch)o;
+                wb = (WriteBatch)o;
                 if (dataFile != wb.dataFile) {
                     if (file != null) {
                         dataFile.closeRandomAccessFile(file);
@@ -180,16 +181,32 @@
         } catch (IOException e) {
             synchronized (enqueueMutex) {
                 firstAsyncException = e;
+                if (wb != null) {
+                    wb.latch.countDown();
+                    wb.exception.set(e);
+                }
+                if (nextWriteBatch != null) {
+                    nextWriteBatch.latch.countDown();
+                    nextWriteBatch.exception.set(e);
+                }
             }
         } catch (InterruptedException e) {
         } finally {
             try {
                 if (file != null) {
                     dataFile.closeRandomAccessFile(file);
+                    dataFile = null;
+                    file.close();
+                    file = null;
+                }
+                if (channel != null) {
+                    channel.close();
+                    channel = null;
                 }
             } catch (IOException e) {
             }
             shutdownDone.countDown();
+            running = false;
         }
     }