You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2014/12/04 14:04:34 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-5438 -
improve kahadb archive logs. This closes #50
Repository: activemq
Updated Branches:
refs/heads/trunk 5d77b395f -> 802e527ea
https://issues.apache.org/jira/browse/AMQ-5438 - improve kahadb archive logs. This closes #50
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/802e527e
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/802e527e
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/802e527e
Branch: refs/heads/trunk
Commit: 802e527ea4f634ffb10e23cef02235ddfb0397fe
Parents: 5d77b39
Author: Dejan Bosanac <de...@nighttale.net>
Authored: Thu Dec 4 14:04:23 2014 +0100
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Thu Dec 4 14:04:23 2014 +0100
----------------------------------------------------------------------
.../kahadb/MultiKahaDBPersistenceAdapter.java | 11 ++--
.../kahadb/MultiKahaDBTransactionStore.java | 1 +
.../store/kahadb/disk/journal/DataFile.java | 2 +-
.../store/kahadb/disk/journal/Journal.java | 57 ++++++++++++++------
.../src/test/resources/log4j.properties | 2 +
5 files changed, 50 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/802e527e/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
index eca83e8..f22e544 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java
@@ -20,12 +20,7 @@ import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.nio.charset.Charset;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
import javax.transaction.xa.Xid;
@@ -518,6 +513,10 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem
return transactionStore.getJournalMaxWriteBatchSize();
}
+ public List<PersistenceAdapter> getAdapters() {
+ return Collections.unmodifiableList(adapters);
+ }
+
@Override
public String toString() {
String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";
http://git-wip-us.apache.org/repos/asf/activemq/blob/802e527e/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
index 8840a1d..880faff 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBTransactionStore.java
@@ -444,4 +444,5 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
}
destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
}
+
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/802e527e/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java
index d5762d2..ed3f312 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java
@@ -81,7 +81,7 @@ public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFil
}
public synchronized void move(File targetDirectory) throws IOException{
- IOHelper.moveFile(file,targetDirectory);
+ IOHelper.moveFile(file, targetDirectory);
}
public SequenceSet getCorruptedBlocks() {
http://git-wip-us.apache.org/repos/asf/activemq/blob/802e527e/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
index 6431acc..50d27cd 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
@@ -20,23 +20,14 @@ import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.TreeMap;
+import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.zip.Adler32;
import java.util.zip.Checksum;
import org.apache.activemq.store.kahadb.disk.util.LinkedNode;
+import org.apache.activemq.util.IOHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.activemq.util.ByteSequence;
@@ -95,7 +86,9 @@ public class Journal {
protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>();
protected File directory = new File(DEFAULT_DIRECTORY);
- protected File directoryArchive = new File(DEFAULT_ARCHIVE_DIRECTORY);
+ protected File directoryArchive;
+ private boolean directoryArchiveOverridden = false;
+
protected String filePrefix = DEFAULT_FILE_PREFIX;
protected String fileSuffix = DEFAULT_FILE_SUFFIX;
protected boolean started;
@@ -121,6 +114,12 @@ public class Journal {
protected boolean enableAsyncDiskSync = true;
private Timer timer;
+ public interface DataFileRemovedListener {
+ void fileRemoved(DataFile datafile);
+ }
+
+ private DataFileRemovedListener dataFileRemovedListener;
+
public synchronized void start() throws IOException {
if (started) {
return;
@@ -434,15 +433,31 @@ public class Journal {
totalLength.addAndGet(-dataFile.getLength());
dataFile.unlink();
if (archiveDataLogs) {
- dataFile.move(getDirectoryArchive());
- LOG.debug("moved data file " + dataFile + " to " + getDirectoryArchive());
+ File directoryArchive = getDirectoryArchive();
+ if (directoryArchive.exists()) {
+ LOG.debug("Archive directory exists: {}", directoryArchive);
+ } else {
+ if (directoryArchive.isAbsolute())
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Archive directory [{}] does not exist - creating it now",
+ directoryArchive.getAbsolutePath());
+ }
+ IOHelper.mkdirs(directoryArchive);
+ }
+ LOG.debug("Moving data file {} to {} ", dataFile, directoryArchive.getCanonicalPath());
+ dataFile.move(directoryArchive);
+ LOG.debug("Successfully moved data file");
} else {
+ LOG.debug("Deleting data file: {}", dataFile);
if ( dataFile.delete() ) {
- LOG.debug("Discarded data file " + dataFile);
+ LOG.debug("Discarded data file: {}", dataFile);
} else {
- LOG.warn("Failed to discard data file " + dataFile.getFile());
+ LOG.warn("Failed to discard data file : {}", dataFile.getFile());
}
}
+ if (dataFileRemovedListener != null) {
+ dataFileRemovedListener.fileRemoved(dataFile);
+ }
}
/**
@@ -657,10 +672,16 @@ public class Journal {
}
public File getDirectoryArchive() {
+ if (!directoryArchiveOverridden && (directoryArchive == null)) {
+ // create the directoryArchive relative to the journal location
+ directoryArchive = new File(directory.getAbsolutePath() +
+ File.separator + DEFAULT_ARCHIVE_DIRECTORY);
+ }
return directoryArchive;
}
public void setDirectoryArchive(File directoryArchive) {
+ directoryArchiveOverridden = true;
this.directoryArchive = directoryArchive;
}
@@ -760,6 +781,10 @@ public class Journal {
return enableAsyncDiskSync;
}
+ public void setDataFileRemovedListener(DataFileRemovedListener dataFileRemovedListener) {
+ this.dataFileRemovedListener = dataFileRemovedListener;
+ }
+
public static class WriteCommand extends LinkedNode<WriteCommand> {
public final Location location;
public final ByteSequence data;
http://git-wip-us.apache.org/repos/asf/activemq/blob/802e527e/activemq-unit-tests/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/resources/log4j.properties b/activemq-unit-tests/src/test/resources/log4j.properties
index 85516aa..4704dbc 100755
--- a/activemq-unit-tests/src/test/resources/log4j.properties
+++ b/activemq-unit-tests/src/test/resources/log4j.properties
@@ -28,6 +28,8 @@ log4j.rootLogger=INFO, out, stdout
#log4j.logger.org.apache.activemq.store.kahadb=TRACE
#log4j.logger.org.apache.activemq.broker.region.cursors.AbstractStoreCursor=DEBUG
#log4j.logger.org.apache.activemq.store.jdbc.JDBCMessageStore=DEBUG
+#log4j.logger.org.apache.activemq.store.kahadb.disk.journal=DEBUG
+#log4j.logger.org.apache.activemq.store.kahadb.AbstractKahaDBStore=DEBUG
# CONSOLE appender not used by default
log4j.appender.stdout=org.apache.log4j.ConsoleAppender