You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2017/03/07 12:29:21 UTC
activemq git commit: [AMQ-6606] - reset next batch so it does not get
reused after error and refine fix to sync write batches b/c async locations
will already be in the index
Repository: activemq
Updated Branches:
refs/heads/master 0d824a8e6 -> 21ae1ef2e
[AMQ-6606] - reset next batch so it does not get reused after error and refine fix to sync write batches b/c async locations will already be in the index
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/21ae1ef2
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/21ae1ef2
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/21ae1ef2
Branch: refs/heads/master
Commit: 21ae1ef2e658a14f89bde79a83a32af340fb351a
Parents: 0d824a8
Author: gtully <ga...@gmail.com>
Authored: Tue Mar 7 12:28:58 2017 +0000
Committer: gtully <ga...@gmail.com>
Committed: Tue Mar 7 12:28:58 2017 +0000
----------------------------------------------------------------------
.../kahadb/disk/journal/DataFileAppender.java | 21 +++++-
.../DataFileAppenderNoSpaceNoBatchTest.java | 68 ++++++++++++++++++++
2 files changed, 86 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/21ae1ef2/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
index 25c4e28..3153a50 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
@@ -355,7 +355,11 @@ class DataFileAppender implements FileAppender {
synchronized (enqueueMutex) {
running = false;
signalError(wb, error);
- signalError(nextWriteBatch, error);
+ if (nextWriteBatch != null) {
+ signalError(nextWriteBatch, error);
+ nextWriteBatch = null;
+ enqueueMutex.notifyAll();
+ }
}
} finally {
try {
@@ -402,12 +406,23 @@ class DataFileAppender implements FileAppender {
if (wb != null) {
if (t instanceof IOException) {
wb.exception.set((IOException) t);
- // revert batch increment such that next write is contiguous
- wb.dataFile.decrementLength(wb.size);
+ // revert sync batch increment such that next write is contiguous
+ if (syncBatch(wb.writes)) {
+ wb.dataFile.decrementLength(wb.size);
+ }
} else {
wb.exception.set(IOExceptionSupport.create(t));
}
signalDone(wb);
}
}
+
+ // async writes will already be in the index so reuse is not an option
+ private boolean syncBatch(LinkedNodeList<Journal.WriteCommand> writes) {
+ Journal.WriteCommand write = writes.getHead();
+ while (write != null && write.sync) {
+ write = write.getNext();
+ }
+ return write == null;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/21ae1ef2/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java
index aa6df3f..ec68d13 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java
@@ -21,17 +21,27 @@ import org.apache.activemq.util.RecoverableRandomAccessFile;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class DataFileAppenderNoSpaceNoBatchTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DataFileAppenderNoSpaceNoBatchTest.class);
+
@Rule
public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
@@ -77,4 +87,62 @@ public class DataFileAppenderNoSpaceNoBatchTest {
assertEquals("offset is reused", seekPositions.get(0), seekPositions.get(1));
}
+
+
+ @Test(timeout = 10000)
+ public void testNoSpaceNextWriteSameBatchAsync() throws Exception {
+ final List<Long> seekPositions = Collections.synchronizedList(new ArrayList<Long>());
+
+ final DataFile currentDataFile = new DataFile(dataFileDir.newFile(), 0) {
+ public RecoverableRandomAccessFile appendRandomAccessFile() throws IOException {
+
+ return new RecoverableRandomAccessFile(dataFileDir.newFile(), "rw") {
+
+ public void seek(long pos) throws IOException {
+ seekPositions.add(pos);
+ }
+
+ public void write(byte[] bytes, int offset, int len) throws IOException {
+ if (seekPositions.size() == 2) {
+ throw new IOException("No space on device: " + seekPositions.size());
+ }
+ }
+ };
+ };
+ };
+
+ underTest = new DataFileAppender(new Journal() {
+ @Override
+ public DataFile getCurrentDataFile(int capacity) throws IOException {
+ return currentDataFile;
+ };
+
+ @Override
+ public int getWriteBatchSize() {
+ // force multiple async batches
+ return 4*1024;
+ }
+ });
+
+ final ByteSequence byteSequence = new ByteSequence(new byte[1024]);
+
+ ConcurrentLinkedQueue<Location> locations = new ConcurrentLinkedQueue<Location>();
+ HashSet<CountDownLatch> latches = new HashSet<CountDownLatch>();
+ for (int i = 0; i <= 20; i++) {
+ Location location = underTest.storeItem(byteSequence, (byte) 1, false);
+ locations.add(location);
+ latches.add(location.getLatch());
+ }
+
+ for (CountDownLatch latch: latches) {
+ assertTrue("write complete", latch.await(5, TimeUnit.SECONDS));
+ }
+
+ LOG.info("Latches count: " + latches.size());
+ LOG.info("Seeks: " + seekPositions);
+
+ assertTrue("got more than on latch: " + latches.size(), latches.size() > 1);
+ assertTrue("got seeks: " + seekPositions, seekPositions.size() > 2);
+ assertEquals("no duplicates: " + seekPositions, seekPositions.size(), new HashSet<Long>(seekPositions).size());
+ }
}