You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/03/02 22:17:51 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-5578
Repository: activemq
Updated Branches:
refs/heads/master 5d6d42ce9 -> e89b7a57f
https://issues.apache.org/jira/browse/AMQ-5578
Remove unnecessary zero init block as the java spec requires the
allocations to do this already. Fix some warnings.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e89b7a57
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e89b7a57
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e89b7a57
Branch: refs/heads/master
Commit: e89b7a57f1aa206830998ae022c041302a22baea
Parents: 5d6d42c
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Mar 2 16:17:43 2016 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Mar 2 16:17:43 2016 -0500
----------------------------------------------------------------------
.../store/kahadb/disk/journal/Journal.java | 112 +++++++++++--------
1 file changed, 63 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/e89b7a57/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
index d00c377..ba0b9f5 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
@@ -16,28 +16,45 @@
*/
package org.apache.activemq.store.kahadb.disk.journal;
-import java.io.*;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.zip.Adler32;
import java.util.zip.Checksum;
+
import org.apache.activemq.store.kahadb.disk.util.LinkedNode;
-import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
-import org.apache.activemq.util.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.activemq.store.kahadb.disk.util.LinkedNodeList;
import org.apache.activemq.store.kahadb.disk.util.SchedulerTimerTask;
import org.apache.activemq.store.kahadb.disk.util.Sequence;
+import org.apache.activemq.util.ByteSequence;
+import org.apache.activemq.util.DataByteArrayInputStream;
+import org.apache.activemq.util.DataByteArrayOutputStream;
+import org.apache.activemq.util.IOHelper;
+import org.apache.activemq.util.RecoverableRandomAccessFile;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Manages DataFiles
- *
- *
*/
public class Journal {
public static final String CALLER_BUFFER_APPENDER = "org.apache.kahadb.journal.CALLER_BUFFER_APPENDER";
@@ -88,8 +105,7 @@ public class Journal {
}
private static byte[] createBatchControlRecordHeader() {
- try {
- DataByteArrayOutputStream os = new DataByteArrayOutputStream();
+ try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) {
os.writeInt(BATCH_CONTROL_RECORD_SIZE);
os.writeByte(BATCH_CONTROL_RECORD_TYPE);
os.write(BATCH_CONTROL_RECORD_MAGIC);
@@ -163,6 +179,7 @@ public class Journal {
appender = callerBufferAppender ? new CallerBufferingDataFileAppender(this) : new DataFileAppender(this);
File[] files = directory.listFiles(new FilenameFilter() {
+ @Override
public boolean accept(File dir, String n) {
return dir.equals(directory) && n.startsWith(filePrefix) && n.endsWith(fileSuffix);
}
@@ -217,12 +234,13 @@ public class Journal {
totalLength.addAndGet(lastAppendLocation.get().getOffset() - maxFileLength);
}
-
cleanupTask = new Runnable() {
+ @Override
public void run() {
cleanup();
}
};
+
this.timer = new Timer("KahaDB Scheduler", true);
TimerTask task = new SchedulerTimerTask(cleanupTask);
this.timer.scheduleAtFixedRate(task, DEFAULT_CLEANUP_INTERVAL,DEFAULT_CLEANUP_INTERVAL);
@@ -230,21 +248,18 @@ public class Journal {
LOG.trace("Startup took: "+(end-start)+" ms");
}
-
public void preallocateEntireJournalDataFile(RecoverableRandomAccessFile file) {
if (PreallocationScope.ENTIRE_JOURNAL == preallocationScope) {
if (PreallocationStrategy.OS_KERNEL_COPY == preallocationStrategy) {
doPreallocationKernelCopy(file);
-
- }else if (PreallocationStrategy.ZEROS == preallocationStrategy) {
+ } else if (PreallocationStrategy.ZEROS == preallocationStrategy) {
doPreallocationZeros(file);
- }
- else {
+ } else {
doPreallocationSparseFile(file);
}
- }else {
+ } else {
LOG.info("Using journal preallocation scope of batch allocation");
}
}
@@ -260,10 +275,7 @@ public class Journal {
private void doPreallocationZeros(RecoverableRandomAccessFile file) {
ByteBuffer buffer = ByteBuffer.allocate(maxFileLength);
- for (int i = 0; i < maxFileLength; i++) {
- buffer.put((byte) 0x00);
- }
- buffer.flip();
+ buffer.limit(maxFileLength);
try {
FileChannel channel = file.getChannel();
@@ -385,49 +397,48 @@ public class Journal {
}
}
-
public int checkBatchRecord(DataFileAccessor reader, int offset) throws IOException {
byte controlRecord[] = new byte[BATCH_CONTROL_RECORD_SIZE];
- DataByteArrayInputStream controlIs = new DataByteArrayInputStream(controlRecord);
- reader.readFully(offset, controlRecord);
+ try (DataByteArrayInputStream controlIs = new DataByteArrayInputStream(controlRecord);) {
- // Assert that it's a batch record.
- for( int i=0; i < BATCH_CONTROL_RECORD_HEADER.length; i++ ) {
- if( controlIs.readByte() != BATCH_CONTROL_RECORD_HEADER[i] ) {
- return -1;
+ reader.readFully(offset, controlRecord);
+
+ // Assert that it's a batch record.
+ for( int i=0; i < BATCH_CONTROL_RECORD_HEADER.length; i++ ) {
+ if( controlIs.readByte() != BATCH_CONTROL_RECORD_HEADER[i] ) {
+ return -1;
+ }
}
- }
- int size = controlIs.readInt();
- if( size > MAX_BATCH_SIZE ) {
- return -1;
- }
+ int size = controlIs.readInt();
+ if( size > MAX_BATCH_SIZE ) {
+ return -1;
+ }
- if( isChecksum() ) {
+ if( isChecksum() ) {
- long expectedChecksum = controlIs.readLong();
- if( expectedChecksum == 0 ) {
- // Checksuming was not enabled when the record was stored.
- // we can't validate the record :(
- return size;
- }
+ long expectedChecksum = controlIs.readLong();
+ if( expectedChecksum == 0 ) {
+ // Checksuming was not enabled when the record was stored.
+ // we can't validate the record :(
+ return size;
+ }
- byte data[] = new byte[size];
- reader.readFully(offset+BATCH_CONTROL_RECORD_SIZE, data);
+ byte data[] = new byte[size];
+ reader.readFully(offset+BATCH_CONTROL_RECORD_SIZE, data);
- Checksum checksum = new Adler32();
- checksum.update(data, 0, data.length);
+ Checksum checksum = new Adler32();
+ checksum.update(data, 0, data.length);
- if( expectedChecksum!=checksum.getValue() ) {
- return -1;
+ if( expectedChecksum!=checksum.getValue() ) {
+ return -1;
+ }
}
-
+ return size;
}
- return size;
}
-
void addToTotalLength(int size) {
totalLength.addAndGet(size);
}
@@ -784,6 +795,7 @@ public class Journal {
public void setReplicationTarget(ReplicationTarget replicationTarget) {
this.replicationTarget = replicationTarget;
}
+
public ReplicationTarget getReplicationTarget() {
return replicationTarget;
}
@@ -869,10 +881,12 @@ public class Journal {
hash = (int)(file ^ offset);
}
+ @Override
public int hashCode() {
return hash;
}
+ @Override
public boolean equals(Object obj) {
if (obj instanceof WriteKey) {
WriteKey di = (WriteKey)obj;