You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ce...@apache.org on 2015/02/20 00:35:15 UTC
activemq git commit: Updated for
https://issues.apache.org/jira/browse/AMQ-5578 adds unit tests, logging,
and removes the preallocated batch stuff that snuck in there with commit
https://github.com/apache/activemq/commit/45e59e6e839ae89ffc099d32a4180ee30
Repository: activemq
Updated Branches:
refs/heads/master 37b1b6a21 -> 023b2ac04
Updated for https://issues.apache.org/jira/browse/AMQ-5578 adds unit tests, logging, and removes the preallocated batch stuff that snuck in there with commit https://github.com/apache/activemq/commit/45e59e6e839ae89ffc099d32a4180ee307543aae which was by accident.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/023b2ac0
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/023b2ac0
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/023b2ac0
Branch: refs/heads/master
Commit: 023b2ac04501a97452eb8a23b6024fd336cb6cf6
Parents: 37b1b6a
Author: Christian Posta <ch...@gmail.com>
Authored: Thu Feb 19 16:34:32 2015 -0700
Committer: Christian Posta <ch...@gmail.com>
Committed: Thu Feb 19 16:34:32 2015 -0700
----------------------------------------------------------------------
.../activemq/store/kahadb/MessageDatabase.java | 9 --
.../store/kahadb/disk/journal/DataFile.java | 36 --------
.../kahadb/disk/journal/DataFileAppender.java | 4 -
.../store/kahadb/disk/journal/Journal.java | 15 +---
.../disk/journal/PreallocationJournalTest.java | 89 ++++++++++++++++++++
5 files changed, 93 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/023b2ac0/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
index 9fc29f4..54188fb 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java
@@ -237,7 +237,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
long cleanupInterval = 30*1000;
int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
- int preallocationBatchSize = Journal.DEFAULT_PREALLOCATION_BATCH_SIZE;
boolean enableIndexWriteAsync = false;
int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
private String preallocationScope = Journal.PreallocationScope.ENTIRE_JOURNAL.name();
@@ -2493,7 +2492,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
manager.setPreallocationScope(Journal.PreallocationScope.valueOf(preallocationScope.trim().toUpperCase()));
manager.setPreallocationStrategy(
Journal.PreallocationStrategy.valueOf(preallocationStrategy.trim().toUpperCase()));
- manager.setPreallocationBatchSize(preallocationBatchSize);
if (getDirectoryArchive() != null) {
IOHelper.mkdirs(getDirectoryArchive());
manager.setDirectoryArchive(getDirectoryArchive());
@@ -3199,11 +3197,4 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
this.preallocationStrategy = preallocationStrategy;
}
- public int getPreallocationBatchSize() {
- return preallocationBatchSize;
- }
-
- public void setPreallocationBatchSize(int preallocationBatchSize) {
- this.preallocationBatchSize = preallocationBatchSize;
- }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/023b2ac0/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java
index ac35866..f1e078d 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java
@@ -42,7 +42,6 @@ public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFil
protected final Integer dataFileId;
protected volatile int length;
protected final SequenceSet corruptedBlocks = new SequenceSet();
- protected long preallocationBatchWindow = 0L;
DataFile(File file, int number) {
this.file = file;
@@ -68,7 +67,6 @@ public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFil
public synchronized void incrementLength(int size) {
length += size;
- preallocationBatchWindow -= size;
}
@Override
@@ -115,38 +113,4 @@ public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFil
return dataFileId;
}
- public void preallocateJournalBatch(Journal journal, long newMessageSize) {
-
- if (preallocationBatchWindow - newMessageSize <= 0) {
- int preallocationBatchSize = Math.min(journal.getPreallocationBatchSize(),
- journal.maxFileLength - length);
- doPreallocation(preallocationBatchSize);
- preallocationBatchWindow = preallocationBatchSize;
- }
- }
-
- private void doPreallocation(int size) {
- try {
- RecoverableRandomAccessFile file = openRandomAccessFile();
- FileChannel channel = file.getChannel();
-
- channel.position(length+1);
- ByteBuffer buffer = generateAllocation(size);
- channel.write(buffer);
- channel.force(false);
- file.close();
- } catch (IOException e) {
- LOG.debug("Cannot allocate batch for journal, continue without preallocation of batch...");
- }
-
- }
-
- private ByteBuffer generateAllocation(int size) {
- ByteBuffer rc = ByteBuffer.allocate(size);
- for (int i = 0; i < size; i++) {
- rc.put((byte) 0x00);
- }
- rc.flip();
- return rc;
- }
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/023b2ac0/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
index 45ae047..fbb276a 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
@@ -211,10 +211,6 @@ class DataFileAppender implements FileAppender {
file = journal.rotateWriteFile();
}
- // will do batch preallocation on the journal if configured
- if (journal.preallocationScope == Journal.PreallocationScope.BATCH) {
- file.preallocateJournalBatch(journal, write.location.getSize());
- }
nextWriteBatch = newWriteBatch(write, file);
enqueueMutex.notifyAll();
http://git-wip-us.apache.org/repos/asf/activemq/blob/023b2ac0/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
index 221e929..91f82ae 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
@@ -61,7 +61,6 @@ public class Journal {
}
public enum PreallocationScope {
- BATCH,
ENTIRE_JOURNAL;
}
@@ -87,7 +86,6 @@ public class Journal {
public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30;
public static final int PREFERED_DIFF = 1024 * 512;
public static final int DEFAULT_MAX_WRITE_BATCH_SIZE = 1024 * 1024 * 4;
- public static final int DEFAULT_PREALLOCATION_BATCH_SIZE = 1024 * 1024 * 1;
private static final Logger LOG = LoggerFactory.getLogger(Journal.class);
@@ -123,7 +121,6 @@ public class Journal {
protected PreallocationScope preallocationScope = PreallocationScope.ENTIRE_JOURNAL;
protected PreallocationStrategy preallocationStrategy = PreallocationStrategy.SPARSE_FILE;
- protected int preallocationBatchSize = DEFAULT_PREALLOCATION_BATCH_SIZE;
public interface DataFileRemovedListener {
void fileRemoved(DataFile datafile);
@@ -183,6 +180,10 @@ public class Journal {
getCurrentWriteFile();
+ if (preallocationStrategy != PreallocationStrategy.SPARSE_FILE && maxFileLength != DEFAULT_MAX_FILE_LENGTH) {
+ LOG.warn("You are using a preallocation strategy and journal maxFileLength which should be benchmarked accordingly to not introduce unexpected latencies.");
+ }
+
if( lastAppendLocation.get()==null ) {
DataFile df = dataFiles.getTail();
lastAppendLocation.set(recoveryCheck(df));
@@ -678,14 +679,6 @@ public class Journal {
this.preallocationScope = preallocationScope;
}
- public int getPreallocationBatchSize() {
- return preallocationBatchSize;
- }
-
- public void setPreallocationBatchSize(int preallocationBatchSize) {
- this.preallocationBatchSize = preallocationBatchSize;
- }
-
public File getDirectory() {
return directory;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/023b2ac0/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalTest.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalTest.java
new file mode 100644
index 0000000..bf5b65b
--- /dev/null
+++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/disk/journal/PreallocationJournalTest.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.store.kahadb.disk.journal;
+
+import org.apache.activemq.store.kahadb.KahaDBStore;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Created by ceposta
+ * <a href="http://christianposta.com/blog>http://christianposta.com/blog</a>.
+ */
+public class PreallocationJournalTest {
+
+ @Test
+ public void testSparseFilePreallocation() throws Exception {
+ executeTest("sparse_file");
+ }
+
+ @Test
+ public void testOSCopyPreallocation() throws Exception {
+ executeTest("os_kernel_copy");
+ }
+
+ @Test
+ public void testZerosPreallocation() throws Exception {
+ executeTest("zeros");
+ }
+
+ private void executeTest(String preallocationStrategy)throws Exception {
+ Random rand = new Random();
+ int randInt = rand.nextInt(100);
+ File dataDirectory = new File("./target/activemq-data/kahadb" + randInt);
+
+ KahaDBStore store = new KahaDBStore();
+ store.deleteAllMessages();
+ store.setDirectory(dataDirectory);
+ store.setPreallocationStrategy(preallocationStrategy);
+ store.start();
+
+ // time for files to get there.. i know this is a brittle test! need to find
+ // a better way (callbacks?) to notify when the journal is completely up
+ TimeUnit.MILLISECONDS.sleep(500);
+ File journalLog = new File(dataDirectory, "db-1.log");
+ assertTrue(journalLog.exists());
+
+
+ FileInputStream is = new FileInputStream(journalLog);
+ FileChannel channel = is.getChannel();
+ assertEquals(Journal.DEFAULT_MAX_FILE_LENGTH, channel.size());
+
+ channel.position(1 * 1024 * 1024 + 1);
+ ByteBuffer buff = ByteBuffer.allocate(1);
+ channel.read(buff);
+ buff.flip();
+ buff.position(0);
+ assertEquals(0x00, buff.get());
+
+ System.out.println("File size: " + channel.size());
+
+
+ store.stop();
+ }
+
+
+}