You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ce...@apache.org on 2015/02/20 00:35:15 UTC

activemq git commit: Updated for https://issues.apache.org/jira/browse/AMQ-5578 adds unit tests, logging, and removes the preallocated batch stuff that snuck in there with commit https://github.com/apache/activemq/commit/45e59e6e839ae89ffc099d32a4180ee30

Repository: activemq
Updated Branches:
  refs/heads/master 37b1b6a21 -> 023b2ac04


Updated for https://issues.apache.org/jira/browse/AMQ-5578 adds unit tests, logging, and removes the preallocated batch stuff that snuck in there with commit https://github.com/apache/activemq/commit/45e59e6e839ae89ffc099d32a4180ee307543aae which was by accident.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/023b2ac0
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/023b2ac0
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/023b2ac0

Branch: refs/heads/master
Commit: 023b2ac04501a97452eb8a23b6024fd336cb6cf6
Parents: 37b1b6a
Author: Christian Posta <ch...@gmail.com>
Authored: Thu Feb 19 16:34:32 2015 -0700
Committer: Christian Posta <ch...@gmail.com>
Committed: Thu Feb 19 16:34:32 2015 -0700

----------------------------------------------------------------------
 .../activemq/store/kahadb/MessageDatabase.java  |  9 --
 .../store/kahadb/disk/journal/DataFile.java     | 36 --------
 .../kahadb/disk/journal/DataFileAppender.java   |  4 -
 .../store/kahadb/disk/journal/Journal.java      | 15 +---
 .../disk/journal/PreallocationJournalTest.java  | 89 ++++++++++++++++++++
 5 files changed, 93 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/023b2ac0/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 9fc29f4..54188fb 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -237,7 +237,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
     long cleanupInterval = 30*1000;
     int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
     int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
-    int preallocationBatchSize = Journal.DEFAULT_PREALLOCATION_BATCH_SIZE;
     boolean enableIndexWriteAsync = false;
     int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
     private String preallocationScope = Journal.PreallocationScope.ENTIRE_JOURNAL.name();
@@ -2493,7 +2492,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         manager.setPreallocationScope(Journal.PreallocationScope.valueOf(preallocationScope.trim().toUpperCase()));
         manager.setPreallocationStrategy(
                 Journal.PreallocationStrategy.valueOf(preallocationStrategy.trim().toUpperCase()));
-        manager.setPreallocationBatchSize(preallocationBatchSize);
         if (getDirectoryArchive() != null) {
             IOHelper.mkdirs(getDirectoryArchive());
             manager.setDirectoryArchive(getDirectoryArchive());
@@ -3199,11 +3197,4 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
         this.preallocationStrategy = preallocationStrategy;
     }
 
-    public int getPreallocationBatchSize() {
-        return preallocationBatchSize;
-    }
-
-    public void setPreallocationBatchSize(int preallocationBatchSize) {
-        this.preallocationBatchSize = preallocationBatchSize;
-    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/023b2ac0/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java
index ac35866..f1e078d 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java
@@ -42,7 +42,6 @@ public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFil
     protected final Integer dataFileId;
     protected volatile int length;
     protected final SequenceSet corruptedBlocks = new SequenceSet();
-    protected long preallocationBatchWindow = 0L;
 
     DataFile(File file, int number) {
         this.file = file;
@@ -68,7 +67,6 @@ public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFil
 
     public synchronized void incrementLength(int size) {
         length += size;
-        preallocationBatchWindow -= size;
     }
 
     @Override
@@ -115,38 +113,4 @@ public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFil
         return dataFileId;
     }
 
-    public void preallocateJournalBatch(Journal journal, long newMessageSize) {
-
-        if (preallocationBatchWindow - newMessageSize <= 0) {
-            int preallocationBatchSize = Math.min(journal.getPreallocationBatchSize(),
-                    journal.maxFileLength - length);
-            doPreallocation(preallocationBatchSize);
-            preallocationBatchWindow = preallocationBatchSize;
-        }
-    }
-
-    private void doPreallocation(int size) {
-        try {
-            RecoverableRandomAccessFile file = openRandomAccessFile();
-            FileChannel channel = file.getChannel();
-
-            channel.position(length+1);
-            ByteBuffer buffer = generateAllocation(size);
-            channel.write(buffer);
-            channel.force(false);
-            file.close();
-        } catch (IOException e) {
-            LOG.debug("Cannot allocate batch for journal, continue without preallocation of batch...");
-        }
-
-    }
-
-    private ByteBuffer generateAllocation(int size) {
-        ByteBuffer rc = ByteBuffer.allocate(size);
-        for (int i = 0; i < size; i++) {
-            rc.put((byte) 0x00);
-        }
-        rc.flip();
-        return rc;
-    }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/023b2ac0/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
index 45ae047..fbb276a 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
@@ -211,10 +211,6 @@ class DataFileAppender implements FileAppender {
                         file = journal.rotateWriteFile();
                     }
 
-                    // will do batch preallocation on the journal if configured
-                    if (journal.preallocationScope == Journal.PreallocationScope.BATCH) {
-                        file.preallocateJournalBatch(journal, write.location.getSize());
-                    }
 
                     nextWriteBatch = newWriteBatch(write, file);
                     enqueueMutex.notifyAll();

http://git-wip-us.apache.org/repos/asf/activemq/blob/023b2ac0/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
index 221e929..91f82ae 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
@@ -61,7 +61,6 @@ public class Journal {
     }
 
     public enum PreallocationScope {
-        BATCH,
         ENTIRE_JOURNAL;
     }
 
@@ -87,7 +86,6 @@ public class Journal {
     public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30;
     public static final int PREFERED_DIFF = 1024 * 512;
     public static final int DEFAULT_MAX_WRITE_BATCH_SIZE = 1024 * 1024 * 4;
-    public static final int DEFAULT_PREALLOCATION_BATCH_SIZE = 1024 * 1024 * 1;
 
     private static final Logger LOG = LoggerFactory.getLogger(Journal.class);
 
@@ -123,7 +121,6 @@ public class Journal {
 
     protected PreallocationScope preallocationScope = PreallocationScope.ENTIRE_JOURNAL;
     protected PreallocationStrategy preallocationStrategy = PreallocationStrategy.SPARSE_FILE;
-    protected int preallocationBatchSize = DEFAULT_PREALLOCATION_BATCH_SIZE;
 
     public interface DataFileRemovedListener {
         void fileRemoved(DataFile datafile);
@@ -183,6 +180,10 @@ public class Journal {
 
         getCurrentWriteFile();
 
+        if (preallocationStrategy != PreallocationStrategy.SPARSE_FILE && maxFileLength != DEFAULT_MAX_FILE_LENGTH) {
+            LOG.warn("You are using a preallocation strategy and journal maxFileLength which should be benchmarked accordingly to not introduce unexpected latencies.");
+        }
+
         if( lastAppendLocation.get()==null ) {
             DataFile df = dataFiles.getTail();
             lastAppendLocation.set(recoveryCheck(df));
@@ -678,14 +679,6 @@ public class Journal {
         this.preallocationScope = preallocationScope;
     }
 
-    public int getPreallocationBatchSize() {
-        return preallocationBatchSize;
-    }
-
-    public void setPreallocationBatchSize(int preallocationBatchSize) {
-        this.preallocationBatchSize = preallocationBatchSize;
-    }
-
     public File getDirectory() {
         return directory;
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/023b2ac0/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalTest.java
new file mode 100644
index 0000000..bf5b65b
--- /dev/null
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalTest.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb.disk.journal;
+
+import org.apache.activemq.store.kahadb.KahaDBStore;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Created by ceposta
+ * <a href="http://christianposta.com/blog>http://christianposta.com/blog</a>.
+ */
+public class PreallocationJournalTest  {
+
+    @Test
+    public void testSparseFilePreallocation() throws Exception {
+        executeTest("sparse_file");
+    }
+
+    @Test
+    public void testOSCopyPreallocation() throws Exception {
+        executeTest("os_kernel_copy");
+    }
+
+    @Test
+    public void testZerosPreallocation() throws Exception {
+        executeTest("zeros");
+    }
+
+    private void executeTest(String preallocationStrategy)throws Exception {
+        Random rand = new Random();
+        int randInt = rand.nextInt(100);
+        File dataDirectory = new File("./target/activemq-data/kahadb" + randInt);
+
+        KahaDBStore store = new KahaDBStore();
+        store.deleteAllMessages();
+        store.setDirectory(dataDirectory);
+        store.setPreallocationStrategy(preallocationStrategy);
+        store.start();
+
+        // time for files to get there.. i know this is a brittle test! need to find
+        // a better way (callbacks?) to notify when the journal is completely up
+        TimeUnit.MILLISECONDS.sleep(500);
+        File journalLog = new File(dataDirectory, "db-1.log");
+        assertTrue(journalLog.exists());
+
+
+        FileInputStream is = new FileInputStream(journalLog);
+        FileChannel channel = is.getChannel();
+        assertEquals(Journal.DEFAULT_MAX_FILE_LENGTH, channel.size());
+
+        channel.position(1 * 1024 * 1024 + 1);
+        ByteBuffer buff = ByteBuffer.allocate(1);
+        channel.read(buff);
+        buff.flip();
+        buff.position(0);
+        assertEquals(0x00, buff.get());
+
+        System.out.println("File size: " + channel.size());
+
+
+        store.stop();
+    }
+
+
+}