You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2017/03/07 12:29:21 UTC

activemq git commit: [AMQ-6606] - reset next batch so it does not get reused after error and refine fix to sync write batches b/c async locations will already be in the index

Repository: activemq
Updated Branches:
  refs/heads/master 0d824a8e6 -> 21ae1ef2e


[AMQ-6606] - reset next batch so it does not get reused after error and refine fix to sync write batches b/c async locations will already be in the index


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/21ae1ef2
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/21ae1ef2
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/21ae1ef2

Branch: refs/heads/master
Commit: 21ae1ef2e658a14f89bde79a83a32af340fb351a
Parents: 0d824a8
Author: gtully <ga...@gmail.com>
Authored: Tue Mar 7 12:28:58 2017 +0000
Committer: gtully <ga...@gmail.com>
Committed: Tue Mar 7 12:28:58 2017 +0000

----------------------------------------------------------------------
 .../kahadb/disk/journal/DataFileAppender.java   | 21 +++++-
 .../DataFileAppenderNoSpaceNoBatchTest.java     | 68 ++++++++++++++++++++
 2 files changed, 86 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/21ae1ef2/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
index 25c4e28..3153a50 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
@@ -355,7 +355,11 @@ class DataFileAppender implements FileAppender {
             synchronized (enqueueMutex) {
                 running = false;
                 signalError(wb, error);
-                signalError(nextWriteBatch, error);
+                if (nextWriteBatch != null) {
+                    signalError(nextWriteBatch, error);
+                    nextWriteBatch = null;
+                    enqueueMutex.notifyAll();
+                }
             }
         } finally {
             try {
@@ -402,12 +406,23 @@ class DataFileAppender implements FileAppender {
         if (wb != null) {
             if (t instanceof IOException) {
                 wb.exception.set((IOException) t);
-                // revert batch increment such that next write is contiguous
-                wb.dataFile.decrementLength(wb.size);
+                // revert sync batch increment such that next write is contiguous
+                if (syncBatch(wb.writes)) {
+                    wb.dataFile.decrementLength(wb.size);
+                }
             } else {
                 wb.exception.set(IOExceptionSupport.create(t));
             }
             signalDone(wb);
         }
     }
+
+    // async writes will already be in the index so reuse is not an option
+    private boolean syncBatch(LinkedNodeList<Journal.WriteCommand> writes) {
+        Journal.WriteCommand write = writes.getHead();
+        while (write != null && write.sync) {
+            write = write.getNext();
+        }
+        return write == null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/21ae1ef2/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java
index aa6df3f..ec68d13 100644
--- a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppenderNoSpaceNoBatchTest.java
@@ -21,17 +21,27 @@ import org.apache.activemq.util.RecoverableRandomAccessFile;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 public class DataFileAppenderNoSpaceNoBatchTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DataFileAppenderNoSpaceNoBatchTest.class);
+
     @Rule
     public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
 
@@ -77,4 +87,62 @@ public class DataFileAppenderNoSpaceNoBatchTest {
         assertEquals("offset is reused", seekPositions.get(0), seekPositions.get(1));
 
     }
+
+    
+    @Test(timeout = 10000)
+    public void testNoSpaceNextWriteSameBatchAsync() throws Exception {
+        final List<Long> seekPositions = Collections.synchronizedList(new ArrayList<Long>());
+
+        final DataFile currentDataFile = new DataFile(dataFileDir.newFile(), 0) {
+            public RecoverableRandomAccessFile appendRandomAccessFile() throws IOException {
+
+                return new RecoverableRandomAccessFile(dataFileDir.newFile(), "rw") {
+
+                    public void seek(long pos) throws IOException {
+                        seekPositions.add(pos);
+                    }
+
+                    public void write(byte[] bytes, int offset, int len) throws IOException {
+                        if (seekPositions.size() == 2) {
+                            throw new IOException("No space on device: " + seekPositions.size());
+                        }
+                    }
+                };
+            };
+        };
+
+        underTest = new DataFileAppender(new Journal() {
+            @Override
+            public DataFile getCurrentDataFile(int capacity) throws IOException {
+                return currentDataFile;
+            };
+
+            @Override
+            public int getWriteBatchSize() {
+                // force multiple async batches
+                return 4*1024;
+            }
+        });
+
+        final ByteSequence byteSequence = new ByteSequence(new byte[1024]);
+
+        ConcurrentLinkedQueue<Location> locations = new ConcurrentLinkedQueue<Location>();
+        HashSet<CountDownLatch> latches = new HashSet<CountDownLatch>();
+        for (int i = 0; i <= 20; i++) {
+            Location location = underTest.storeItem(byteSequence, (byte) 1, false);
+            locations.add(location);
+            latches.add(location.getLatch());
+        }
+
+        for (CountDownLatch latch: latches) {
+            assertTrue("write complete", latch.await(5, TimeUnit.SECONDS));
+        }
+
+        LOG.info("Latches count: " + latches.size());
+        LOG.info("Seeks: " + seekPositions);
+
+        assertTrue("got more than on latch: " + latches.size(), latches.size() > 1);
+        assertTrue("got seeks: " + seekPositions, seekPositions.size() > 2);
+        assertEquals("no duplicates: " + seekPositions, seekPositions.size(), new HashSet<Long>(seekPositions).size());
+    }
 }