You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2009/11/20 15:40:14 UTC
svn commit: r882579 - in
/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async:
DataFileAppender.java NIODataFileAppender.java
Author: dejanb
Date: Fri Nov 20 14:40:13 2009
New Revision: 882579
URL: http://svn.apache.org/viewvc?rev=882579&view=rev
Log:
merging 882126 - https://issues.apache.org/activemq/browse/AMQ-2042
Modified:
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java
activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java
Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java?rev=882579&r1=882578&r2=882579&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/DataFileAppender.java Fri Nov 20 14:40:13 2009
@@ -21,6 +21,7 @@
import java.io.RandomAccessFile;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.DataByteArrayOutputStream;
@@ -48,7 +49,7 @@
protected final CountDownLatch shutdownDone = new CountDownLatch(1);
protected int maxWriteBatchSize = DEFAULT_MAX_BATCH_SIZE;
- private boolean running;
+ protected boolean running;
private Thread thread;
public static class WriteKey {
@@ -82,6 +83,7 @@
public final WriteCommand first;
public final CountDownLatch latch = new CountDownLatch(1);
public int size;
+ public AtomicReference<IOException> exception = new AtomicReference<IOException>();
public WriteBatch(DataFile dataFile, WriteCommand write) throws IOException {
this.dataFile = dataFile;
@@ -179,6 +181,10 @@
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
+ IOException exception = batch.exception.get();
+ if (exception != null) {
+ throw exception;
+ }
}
return location;
@@ -216,10 +222,7 @@
if (shutdown) {
throw new IOException("Async Writter Thread Shutdown");
}
- if (firstAsyncException != null) {
- throw firstAsyncException;
- }
-
+
if (!running) {
running = true;
thread = new Thread() {
@@ -231,6 +234,11 @@
thread.setDaemon(true);
thread.setName("ActiveMQ Data File Writer");
thread.start();
+ firstAsyncException = null;
+ }
+
+ if (firstAsyncException != null) {
+ throw firstAsyncException;
}
if (nextWriteBatch == null) {
@@ -298,6 +306,7 @@
protected void processQueue() {
DataFile dataFile = null;
RandomAccessFile file = null;
+ WriteBatch wb = null;
try {
DataByteArrayOutputStream buff = new DataByteArrayOutputStream(maxWriteBatchSize);
@@ -321,7 +330,7 @@
enqueueMutex.notify();
}
- WriteBatch wb = (WriteBatch)o;
+ wb = (WriteBatch)o;
if (dataFile != wb.dataFile) {
if (file != null) {
dataFile.closeRandomAccessFile(file);
@@ -406,6 +415,14 @@
} catch (IOException e) {
synchronized (enqueueMutex) {
firstAsyncException = e;
+ if (wb != null) {
+ wb.latch.countDown();
+ wb.exception.set(e);
+ }
+ if (nextWriteBatch != null) {
+ nextWriteBatch.latch.countDown();
+ nextWriteBatch.exception.set(e);
+ }
}
} catch (InterruptedException e) {
} finally {
Modified: activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java
URL: http://svn.apache.org/viewvc/activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java?rev=882579&r1=882578&r2=882579&view=diff
==============================================================================
--- activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java (original)
+++ activemq/branches/activemq-5.3/activemq-core/src/main/java/org/apache/activemq/kaha/impl/async/NIODataFileAppender.java Fri Nov 20 14:40:13 2009
@@ -47,6 +47,7 @@
DataFile dataFile = null;
RandomAccessFile file = null;
FileChannel channel = null;
+ WriteBatch wb = null;
try {
@@ -81,7 +82,7 @@
enqueueMutex.notify();
}
- WriteBatch wb = (WriteBatch)o;
+ wb = (WriteBatch)o;
if (dataFile != wb.dataFile) {
if (file != null) {
dataFile.closeRandomAccessFile(file);
@@ -180,16 +181,32 @@
} catch (IOException e) {
synchronized (enqueueMutex) {
firstAsyncException = e;
+ if (wb != null) {
+ wb.latch.countDown();
+ wb.exception.set(e);
+ }
+ if (nextWriteBatch != null) {
+ nextWriteBatch.latch.countDown();
+ nextWriteBatch.exception.set(e);
+ }
}
} catch (InterruptedException e) {
} finally {
try {
if (file != null) {
dataFile.closeRandomAccessFile(file);
+ dataFile = null;
+ file.close();
+ file = null;
+ }
+ if (channel != null) {
+ channel.close();
+ channel = null;
}
} catch (IOException e) {
}
shutdownDone.countDown();
+ running = false;
}
}