You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2015/02/11 16:27:39 UTC
[1/4] activemq git commit:
https://issues.apache.org/jira/browse/AMQ-5578 - implement for kahadb
Repository: activemq
Updated Branches:
refs/heads/master 01d015514 -> 25376afac
https://issues.apache.org/jira/browse/AMQ-5578 - implement for kahadb
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/95f7262c
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/95f7262c
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/95f7262c
Branch: refs/heads/master
Commit: 95f7262cb198fa25bf58f2790584be0f0a230228
Parents: 8cf98a0
Author: gtully <ga...@gmail.com>
Authored: Wed Feb 11 13:21:47 2015 +0000
Committer: gtully <ga...@gmail.com>
Committed: Wed Feb 11 13:37:32 2015 +0000
----------------------------------------------------------------------
.../CallerBufferingDataFileAppender.java | 4 -
.../store/kahadb/disk/journal/DataFile.java | 3 +-
.../kahadb/disk/journal/DataFileAccessor.java | 1 -
.../kahadb/disk/journal/DataFileAppender.java | 24 ++--
.../store/kahadb/disk/journal/Journal.java | 109 ++-----------------
.../kahadb/disk/journal/ReadOnlyDataFile.java | 4 +-
.../kahadb/disk/journal/ReadOnlyJournal.java | 2 +-
.../util/RecoverableRandomAccessFile.java | 7 +-
.../FilePendingMessageCursorTestSupport.java | 9 ++
9 files changed, 39 insertions(+), 124 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/95f7262c/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/CallerBufferingDataFileAppender.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/CallerBufferingDataFileAppender.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/CallerBufferingDataFileAppender.java
index d7c4a28..c6b143b 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/CallerBufferingDataFileAppender.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/CallerBufferingDataFileAppender.java
@@ -111,14 +111,10 @@ class CallerBufferingDataFileAppender extends DataFileAppender {
wb = (WriteBatch)o;
if (dataFile != wb.dataFile) {
if (file != null) {
- file.setLength(dataFile.getLength());
dataFile.closeRandomAccessFile(file);
}
dataFile = wb.dataFile;
file = dataFile.openRandomAccessFile();
- if( file.length() < journal.preferedFileLength ) {
- file.setLength(journal.preferedFileLength);
- }
}
final DataByteArrayOutputStream buff = wb.buff;
http://git-wip-us.apache.org/repos/asf/activemq/blob/95f7262c/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java
index ed3f312..1c5ee3a 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFile.java
@@ -18,7 +18,6 @@ package org.apache.activemq.store.kahadb.disk.journal;
import java.io.File;
import java.io.IOException;
-import java.io.RandomAccessFile;
import org.apache.activemq.store.kahadb.disk.util.LinkedNode;
import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
@@ -37,7 +36,7 @@ public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFil
protected volatile int length;
protected final SequenceSet corruptedBlocks = new SequenceSet();
- DataFile(File file, int number, int preferedSize) {
+ DataFile(File file, int number) {
this.file = file;
this.dataFileId = Integer.valueOf(number);
length = (int)(file.exists() ? file.length() : 0);
http://git-wip-us.apache.org/repos/asf/activemq/blob/95f7262c/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java
index 2046031..4832fdc 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAccessor.java
@@ -38,7 +38,6 @@ final class DataFileAccessor {
/**
* Construct a Store reader
*
- * @param fileId
* @throws IOException
*/
public DataFileAccessor(Journal dataManager, DataFile dataFile) throws IOException {
http://git-wip-us.apache.org/repos/asf/activemq/blob/95f7262c/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
index 969584e..f5d7e10 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/DataFileAppender.java
@@ -182,7 +182,7 @@ class DataFileAppender implements FileAppender {
private WriteBatch enqueue(Journal.WriteCommand write) throws IOException {
synchronized (enqueueMutex) {
if (shutdown) {
- throw new IOException("Async Writter Thread Shutdown");
+ throw new IOException("Async Writer Thread Shutdown");
}
if (!running) {
@@ -207,7 +207,7 @@ class DataFileAppender implements FileAppender {
while ( true ) {
if (nextWriteBatch == null) {
DataFile file = journal.getCurrentWriteFile();
- if( file.getLength() > journal.getMaxFileLength() ) {
+ if( file.getLength() + write.location.getSize() >= journal.getMaxFileLength() ) {
file = journal.rotateWriteFile();
}
@@ -226,7 +226,7 @@ class DataFileAppender implements FileAppender {
final long start = System.currentTimeMillis();
enqueueMutex.wait();
if (maxStat > 0) {
- logger.info("Watiting for write to finish with full batch... millis: " +
+ logger.info("Waiting for write to finish with full batch... millis: " +
(System.currentTimeMillis() - start));
}
}
@@ -234,7 +234,7 @@ class DataFileAppender implements FileAppender {
throw new InterruptedIOException();
}
if (shutdown) {
- throw new IOException("Async Writter Thread Shutdown");
+ throw new IOException("Async Writer Thread Shutdown");
}
}
}
@@ -273,6 +273,7 @@ class DataFileAppender implements FileAppender {
int statIdx = 0;
int[] stats = new int[maxStat];
+ final byte[] end = new byte[]{0};
/**
* The async processing loop that writes to the data files and does the
* force calls. Since the file sync() call is the slowest of all the
@@ -308,14 +309,13 @@ class DataFileAppender implements FileAppender {
if (dataFile != wb.dataFile) {
if (file != null) {
- file.setLength(dataFile.getLength());
dataFile.closeRandomAccessFile(file);
}
dataFile = wb.dataFile;
file = dataFile.openRandomAccessFile();
- if( file.length() < journal.preferedFileLength ) {
- file.setLength(journal.preferedFileLength);
- }
+ // pre allocate on first open
+ file.seek(journal.maxFileLength-1);
+ file.write(end);
}
Journal.WriteCommand write = wb.writes.getHead();
@@ -337,15 +337,19 @@ class DataFileAppender implements FileAppender {
write = write.getNext();
}
+ // append 'unset' next batch (5 bytes) so read can always find eof
+ buff.writeInt(0);
+ buff.writeByte(0);
+
ByteSequence sequence = buff.toByteSequence();
// Now we can fill in the batch control record properly.
buff.reset();
buff.skip(5+Journal.BATCH_CONTROL_RECORD_MAGIC.length);
- buff.writeInt(sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE);
+ buff.writeInt(sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE - 5);
if( journal.isChecksum() ) {
Checksum checksum = new Adler32();
- checksum.update(sequence.getData(), sequence.getOffset()+Journal.BATCH_CONTROL_RECORD_SIZE, sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE);
+ checksum.update(sequence.getData(), sequence.getOffset()+Journal.BATCH_CONTROL_RECORD_SIZE, sequence.getLength()-Journal.BATCH_CONTROL_RECORD_SIZE - 5);
buff.writeLong(checksum.getValue());
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/95f7262c/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
index 50d27cd..5541e9f 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/Journal.java
@@ -94,7 +94,6 @@ public class Journal {
protected boolean started;
protected int maxFileLength = DEFAULT_MAX_FILE_LENGTH;
- protected int preferedFileLength = DEFAULT_MAX_FILE_LENGTH - PREFERED_DIFF;
protected int writeBatchSize = DEFAULT_MAX_WRITE_BATCH_SIZE;
protected FileAppender appender;
@@ -128,7 +127,6 @@ public class Journal {
long start = System.currentTimeMillis();
accessorPool = new DataFileAccessorPool(this);
started = true;
- preferedFileLength = Math.max(PREFERED_DIFF, getMaxFileLength() - PREFERED_DIFF);
appender = callerBufferAppender ? new CallerBufferingDataFileAppender(this) : new DataFileAppender(this);
@@ -144,7 +142,7 @@ public class Journal {
String n = file.getName();
String numStr = n.substring(filePrefix.length(), n.length()-fileSuffix.length());
int num = Integer.parseInt(numStr);
- DataFile dataFile = new DataFile(file, num, preferedFileLength);
+ DataFile dataFile = new DataFile(file, num);
fileMap.put(dataFile.getDataFileId(), dataFile);
totalLength.addAndGet(dataFile.getLength());
} catch (NumberFormatException e) {
@@ -178,6 +176,11 @@ public class Journal {
lastAppendLocation.set(recoveryCheck(df));
}
+ // ensure we don't report unused space of last journal file in size metric
+ if (totalLength.get() > maxFileLength && lastAppendLocation.get().getOffset() > 0) {
+ totalLength.addAndGet(lastAppendLocation.get().getOffset() - maxFileLength);
+ }
+
cleanupTask = new Runnable() {
public void run() {
cleanup();
@@ -330,8 +333,7 @@ public class Journal {
synchronized DataFile rotateWriteFile() {
int nextNum = !dataFiles.isEmpty() ? dataFiles.getTail().getDataFileId().intValue() + 1 : 1;
File file = getFile(nextNum);
- DataFile nextWriteFile = new DataFile(file, nextNum, preferedFileLength);
- // actually allocate the disk space
+ DataFile nextWriteFile = new DataFile(file, nextNum);
fileMap.put(nextWriteFile.getDataFileId(), nextWriteFile);
fileByFileMap.put(file, nextWriteFile);
dataFiles.addLast(nextWriteFile);
@@ -399,9 +401,9 @@ public class Journal {
boolean result = true;
for (Iterator<DataFile> i = fileMap.values().iterator(); i.hasNext();) {
DataFile dataFile = i.next();
- totalLength.addAndGet(-dataFile.getLength());
result &= dataFile.delete();
}
+ totalLength.set(0);
fileMap.clear();
fileByFileMap.clear();
lastAppendLocation.set(null);
@@ -479,26 +481,6 @@ public class Journal {
return directory.toString();
}
- public synchronized void appendedExternally(Location loc, int length) throws IOException {
- DataFile dataFile = null;
- if( dataFiles.getTail().getDataFileId() == loc.getDataFileId() ) {
- // It's an update to the current log file..
- dataFile = dataFiles.getTail();
- dataFile.incrementLength(length);
- } else if( dataFiles.getTail().getDataFileId()+1 == loc.getDataFileId() ) {
- // It's an update to the next log file.
- int nextNum = loc.getDataFileId();
- File file = getFile(nextNum);
- dataFile = new DataFile(file, nextNum, preferedFileLength);
- // actually allocate the disk space
- fileMap.put(dataFile.getDataFileId(), dataFile);
- fileByFileMap.put(file, dataFile);
- dataFiles.addLast(dataFile);
- } else {
- throw new IOException("Invalid external append.");
- }
- }
-
public synchronized Location getNextLocation(Location location) throws IOException, IllegalStateException {
Location cur = null;
@@ -547,7 +529,8 @@ public class Journal {
}
if (cur.getType() == 0) {
- return null;
+ // invalid offset - jump to next datafile
+ cur.setOffset(maxFileLength);
} else if (cur.getType() == USER_RECORD_TYPE) {
// Only return user records.
return cur;
@@ -555,62 +538,6 @@ public class Journal {
}
}
- public synchronized Location getNextLocation(File file, Location lastLocation, boolean thisFileOnly) throws IllegalStateException, IOException {
- DataFile df = fileByFileMap.get(file);
- return getNextLocation(df, lastLocation, thisFileOnly);
- }
-
- public synchronized Location getNextLocation(DataFile dataFile, Location lastLocation, boolean thisFileOnly) throws IOException, IllegalStateException {
-
- Location cur = null;
- while (true) {
- if (cur == null) {
- if (lastLocation == null) {
- DataFile head = dataFile.getHeadNode();
- cur = new Location();
- cur.setDataFileId(head.getDataFileId());
- cur.setOffset(0);
- } else {
- // Set to the next offset..
- cur = new Location(lastLocation);
- cur.setOffset(cur.getOffset() + cur.getSize());
- }
- } else {
- cur.setOffset(cur.getOffset() + cur.getSize());
- }
-
- // Did it go into the next file??
- if (dataFile.getLength() <= cur.getOffset()) {
- if (thisFileOnly) {
- return null;
- } else {
- dataFile = getNextDataFile(dataFile);
- if (dataFile == null) {
- return null;
- } else {
- cur.setDataFileId(dataFile.getDataFileId().intValue());
- cur.setOffset(0);
- }
- }
- }
-
- // Load in location size and type.
- DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
- try {
- reader.readLocationDetails(cur);
- } finally {
- accessorPool.closeDataFileAccessor(reader);
- }
-
- if (cur.getType() == 0) {
- return null;
- } else if (cur.getType() > 0) {
- // Only return user records.
- return cur;
- }
- }
- }
-
public synchronized ByteSequence read(Location location) throws IOException, IllegalStateException {
DataFile dataFile = getDataFile(location);
DataFileAccessor reader = accessorPool.openDataFileAccessor(dataFile);
@@ -713,21 +640,7 @@ public class Journal {
}
public long getDiskSize() {
- long tailLength=0;
- synchronized( this ) {
- if( !dataFiles.isEmpty() ) {
- tailLength = dataFiles.getTail().getLength();
- }
- }
-
- long rc = totalLength.get();
-
- // The last file is actually at a minimum preferedFileLength big.
- if( tailLength < preferedFileLength ) {
- rc -= tailLength;
- rc += preferedFileLength;
- }
- return rc;
+ return totalLength.get();
}
public void setReplicationTarget(ReplicationTarget replicationTarget) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/95f7262c/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/ReadOnlyDataFile.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/ReadOnlyDataFile.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/ReadOnlyDataFile.java
index fe8941a..9d7ef62 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/ReadOnlyDataFile.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/ReadOnlyDataFile.java
@@ -26,8 +26,8 @@ import java.io.RandomAccessFile;
*/
public class ReadOnlyDataFile extends DataFile {
- ReadOnlyDataFile(File file, int number, int preferedSize) {
- super(file, number, preferedSize);
+ ReadOnlyDataFile(File file, int number) {
+ super(file, number);
}
public RandomAccessFile openRandomAccessFile(boolean appender) throws IOException {
http://git-wip-us.apache.org/repos/asf/activemq/blob/95f7262c/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/ReadOnlyJournal.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/ReadOnlyJournal.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/ReadOnlyJournal.java
index 00c52d5..030e8b7 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/ReadOnlyJournal.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/disk/journal/ReadOnlyJournal.java
@@ -60,7 +60,7 @@ public class ReadOnlyJournal extends Journal {
String n = file.getName();
String numStr = n.substring(filePrefix.length(), n.length());
int num = Integer.parseInt(numStr);
- DataFile dataFile = new ReadOnlyDataFile(file, num, preferedFileLength);
+ DataFile dataFile = new ReadOnlyDataFile(file, num);
fileMap.put(dataFile.getDataFileId(), dataFile);
totalLength.addAndGet(dataFile.getLength());
} catch (NumberFormatException e) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/95f7262c/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java
----------------------------------------------------------------------
diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java
index 1b0cb4c..636890e 100644
--- a/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java
+++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/util/RecoverableRandomAccessFile.java
@@ -371,12 +371,7 @@ public class RecoverableRandomAccessFile implements java.io.DataOutput, java.io.
}
public void setLength(long length) throws IOException {
- try {
- getRaf().setLength(length);
- } catch (IOException ioe) {
- handleException();
- throw ioe;
- }
+ throw new IllegalStateException("File size is pre allocated");
}
public void seek(long pos) throws IOException {
http://git-wip-us.apache.org/repos/asf/activemq/blob/95f7262c/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTestSupport.java
index 123263d..ac341a2 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTestSupport.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/region/cursors/FilePendingMessageCursorTestSupport.java
@@ -58,6 +58,15 @@ public class FilePendingMessageCursorTestSupport {
public void testAddToEmptyCursorWhenTempStoreIsFull() throws Exception {
createBrokerWithTempStoreLimit();
SystemUsage usage = brokerService.getSystemUsage();
+
+ PList dud = brokerService.getTempDataStore().getPList("dud");
+ // fill the temp store
+ int id=0;
+ ByteSequence payload = new ByteSequence(new byte[1024]);
+ while (!usage.getTempUsage().isFull()) {
+ dud.addFirst("A-" + (++id), payload);
+ }
+
assertTrue("temp store is full: %" + usage.getTempUsage().getPercentUsage(), usage.getTempUsage().isFull());
underTest = new FilePendingMessageCursor(brokerService.getBroker(), "test", false);
[4/4] activemq git commit: reduce test duration,
relax derbydb durability - AMQ5266SingleDestTest
Posted by gt...@apache.org.
reduce test duration, relax derbydb durability - AMQ5266SingleDestTest
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/25376afa
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/25376afa
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/25376afa
Branch: refs/heads/master
Commit: 25376afac1d504e5c068ca44d6a47cedf188cfe5
Parents: 95f7262
Author: gtully <ga...@gmail.com>
Authored: Wed Feb 11 15:27:45 2015 +0000
Committer: gtully <ga...@gmail.com>
Committed: Wed Feb 11 15:27:45 2015 +0000
----------------------------------------------------------------------
.../src/test/java/org/apache/activemq/TestSupport.java | 6 ++++--
.../java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java | 6 ++++++
2 files changed, 10 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/25376afa/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java
index 7ebe11c..90c1bd2 100755
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java
@@ -183,7 +183,9 @@ public abstract class TestSupport extends CombinationTestSupport {
PersistenceAdapter adapter = null;
switch (choice) {
case JDBC:
- adapter = new JDBCPersistenceAdapter();
+ JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter();
+ jdbcPersistenceAdapter.setUseLock(false); // rollback (at shutdown) on derby can take a long time with file io etc
+ adapter = jdbcPersistenceAdapter;
break;
case KahaDB:
adapter = new KahaDBPersistenceAdapter();
@@ -196,7 +198,7 @@ public abstract class TestSupport extends CombinationTestSupport {
break;
}
broker.setPersistenceAdapter(adapter);
- adapter.setDirectory(broker.getBrokerDataDirectory());
+ adapter.setDirectory(new File(broker.getBrokerDataDirectory(), choice.name()));
return adapter;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/25376afa/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java
index cfd6534..0d7f44b 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5266SingleDestTest.java
@@ -44,6 +44,7 @@ import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.After;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -97,6 +98,11 @@ public class AMQ5266SingleDestTest {
public int consumerBatchSize = 25;
+ @BeforeClass
+ public static void derbyTestMode() throws Exception {
+ System.setProperty("derby.system.durability","test");
+ }
+
@Before
public void startBroker() throws Exception {
brokerService = new BrokerService();
[3/4] activemq git commit: tidy up some jdbc directory usage - move
into target
Posted by gt...@apache.org.
tidy up some jdbc directory usage - move into target
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/c6837ace
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/c6837ace
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/c6837ace
Branch: refs/heads/master
Commit: c6837acefe8ca466d8cd9b3bd9adf297372b65b8
Parents: 01d0155
Author: gtully <ga...@gmail.com>
Authored: Mon Feb 9 12:29:52 2015 +0000
Committer: gtully <ga...@gmail.com>
Committed: Wed Feb 11 13:37:32 2015 +0000
----------------------------------------------------------------------
.../java/org/apache/activemq/TestSupport.java | 1 +
.../org/apache/activemq/broker/AMQ4351Test.java | 4 ++++
.../broker/JdbcXARecoveryBrokerTest.java | 3 +++
.../org/apache/activemq/bugs/AMQ4952Test.java | 8 ++++++++
.../store/jdbc/JDBCIOExceptionHandlerTest.java | 21 +++++++++++++++++---
.../store/jdbc/JDBCNegativeQueueTest.java | 9 ++++-----
6 files changed, 38 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/c6837ace/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java
index 80aac14..26af596 100755
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java
@@ -196,6 +196,7 @@ public abstract class TestSupport extends CombinationTestSupport {
break;
}
broker.setPersistenceAdapter(adapter);
+ adapter.setDirectory(broker.getBrokerDataDirectory());
return adapter;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/c6837ace/activemq-unit-tests/src/test/java/org/apache/activemq/broker/AMQ4351Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/AMQ4351Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/AMQ4351Test.java
index e810f92..ce7ed3d 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/AMQ4351Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/AMQ4351Test.java
@@ -20,12 +20,14 @@ import junit.framework.Test;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.activemq.util.IOHelper;
import org.apache.derby.jdbc.EmbeddedDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.Connection;
import javax.jms.*;
+import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
@@ -58,6 +60,8 @@ public class AMQ4351Test extends BrokerTestSupport {
broker.setOfflineDurableSubscriberTaskSchedule(500);
broker.setOfflineDurableSubscriberTimeout(2000); // lets delete durable subs much faster.
+ System.setProperty("derby.system.home", new File(IOHelper.getDefaultDataDirectory()).getCanonicalPath());
+
JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
EmbeddedDataSource dataSource = new EmbeddedDataSource();
dataSource.setDatabaseName("derbyDb");
http://git-wip-us.apache.org/repos/asf/activemq/blob/c6837ace/activemq-unit-tests/src/test/java/org/apache/activemq/broker/JdbcXARecoveryBrokerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/JdbcXARecoveryBrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/JdbcXARecoveryBrokerTest.java
index 5788dad..35b944a 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/JdbcXARecoveryBrokerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/JdbcXARecoveryBrokerTest.java
@@ -16,10 +16,12 @@
*/
package org.apache.activemq.broker;
+import java.io.File;
import junit.framework.Test;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.activemq.util.IOHelper;
import org.apache.derby.jdbc.EmbeddedDataSource;
import org.apache.derby.jdbc.EmbeddedXADataSource;
@@ -29,6 +31,7 @@ public class JdbcXARecoveryBrokerTest extends XARecoveryBrokerTest {
@Override
protected void setUp() throws Exception {
+ System.setProperty("derby.system.home", new File(IOHelper.getDefaultDataDirectory()).getCanonicalPath());
dataSource = new EmbeddedXADataSource();
dataSource.setDatabaseName("derbyDb");
dataSource.setCreateDatabase("create");
http://git-wip-us.apache.org/repos/asf/activemq/blob/c6837ace/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4952Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4952Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4952Test.java
index 6a52e46..dd26e81 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4952Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4952Test.java
@@ -17,6 +17,7 @@
package org.apache.activemq.bugs;
+import java.io.File;
import java.net.URI;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -55,11 +56,13 @@ import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.network.ConditionalNetworkBridgeFilterFactory;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter;
+import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.Wait;
import org.apache.derby.jdbc.EmbeddedDataSource;
import org.junit.After;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@@ -125,6 +128,11 @@ public class AMQ4952Test {
return Arrays.asList(new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } });
}
+ @BeforeClass
+ public static void dbHomeSysProp() throws Exception {
+ System.setProperty("derby.system.home", new File(IOHelper.getDefaultDataDirectory()).getCanonicalPath());
+ }
+
@Test
public void testConsumerBrokerRestart() throws Exception {
http://git-wip-us.apache.org/repos/asf/activemq/blob/c6837ace/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerTest.java
index df10d73..e95bcbd 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCIOExceptionHandlerTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.store.jdbc;
+import java.io.File;
import java.io.PrintWriter;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
@@ -24,23 +25,28 @@ import java.util.concurrent.atomic.AtomicReference;
import javax.jms.Connection;
-import junit.framework.TestCase;
-
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.LeaseLockerIOExceptionHandler;
import org.apache.activemq.util.Wait;
import org.apache.derby.jdbc.EmbeddedDataSource;
+import org.junit.Before;
+import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import static junit.framework.Assert.assertTrue;
+import static junit.framework.Assert.fail;
+
/**
* Test to see if the JDBCExceptionIOHandler will restart the transport connectors correctly after
* the underlying DB has been stopped and restarted
*
* see AMQ-4575
*/
-public class JDBCIOExceptionHandlerTest extends TestCase {
+public class JDBCIOExceptionHandlerTest {
private static final Logger LOG = LoggerFactory.getLogger(JDBCIOExceptionHandlerTest.class);
private static final String TRANSPORT_URL = "tcp://0.0.0.0:0";
@@ -50,6 +56,11 @@ public class JDBCIOExceptionHandlerTest extends TestCase {
private ReconnectingEmbeddedDataSource dataSource;
private BrokerService broker;
+ @Before
+ public void dbHomeSysProp() throws Exception {
+ System.setProperty("derby.system.home", new File(IOHelper.getDefaultDataDirectory()).getCanonicalPath());
+ }
+
protected BrokerService createBroker(boolean withJMX) throws Exception {
return createBroker("localhost", withJMX, true, true);
}
@@ -94,6 +105,7 @@ public class JDBCIOExceptionHandlerTest extends TestCase {
/*
* run test without JMX enabled
*/
+ @Test
public void testRecoverWithOutJMX() throws Exception {
recoverFromDisconnectDB(false);
}
@@ -101,14 +113,17 @@ public class JDBCIOExceptionHandlerTest extends TestCase {
/*
* run test with JMX enabled
*/
+ @Test
public void testRecoverWithJMX() throws Exception {
recoverFromDisconnectDB(true);
}
+ @Test
public void testSlaveStoppedLease() throws Exception {
testSlaveStopped(true);
}
+ @Test
public void testSlaveStoppedDefault() throws Exception {
testSlaveStopped(false);
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/c6837ace/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCNegativeQueueTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCNegativeQueueTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCNegativeQueueTest.java
index 998ff7c..0a2ca1d 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCNegativeQueueTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/jdbc/JDBCNegativeQueueTest.java
@@ -23,22 +23,21 @@ import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
+import javax.sql.DataSource;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.cursors.NegativeQueueTest;
import org.apache.derby.jdbc.EmbeddedDataSource;
public class JDBCNegativeQueueTest extends NegativeQueueTest {
- EmbeddedDataSource dataSource;
+ DataSource dataSource;
protected void configureBroker(BrokerService answer) throws Exception {
super.configureBroker(answer);
JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter();
- dataSource = new EmbeddedDataSource();
- dataSource.setDatabaseName("derbyDb");
- dataSource.setCreateDatabase("create");
- jdbc.setDataSource(dataSource);
+ jdbc.setDataSource(dataSource);
answer.setPersistenceAdapter(jdbc);
+ dataSource = jdbc.getDataSource();
}
protected void tearDown() throws Exception {
[2/4] activemq git commit: rework
https://issues.apache.org/jira/browse/AMQ-3684 and
https://issues.apache.org/jira/browse/AMQ-4532 to avoid intermittent hangs,
processing shutdown wile shutdown is in progress - AMQ1936Test and AMQ2021Test
- using just
Posted by gt...@apache.org.
rework https://issues.apache.org/jira/browse/AMQ-3684 and https://issues.apache.org/jira/browse/AMQ-4532 to avoid intermittent hangs, processing shutdown wile shutdown is in progress - AMQ1936Test and AMQ2021Test - using just TransportDisposedIOException to propagate exception response and start shutdown process and ignoring broker side for logging
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8cf98a07
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8cf98a07
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8cf98a07
Branch: refs/heads/master
Commit: 8cf98a070f0fb60857e85cb74c4969d0256ebb0e
Parents: c6837ac
Author: gtully <ga...@gmail.com>
Authored: Wed Feb 11 13:13:06 2015 +0000
Committer: gtully <ga...@gmail.com>
Committed: Wed Feb 11 13:37:32 2015 +0000
----------------------------------------------------------------------
.../activemq/broker/TransportConnection.java | 10 ++++----
.../activemq/transport/vm/VMTransport.java | 10 +-------
.../java/org/apache/activemq/TestSupport.java | 2 +-
.../org/apache/activemq/bugs/AMQ2902Test.java | 2 +-
.../transport/vm/VMTransportThreadSafeTest.java | 25 --------------------
5 files changed, 9 insertions(+), 40 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/8cf98a07/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
index 270ed9f..5da0cfa 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -239,10 +239,12 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
}
if (!stopping.get() && !pendingStop) {
transportException.set(e);
- if (TRANSPORTLOG.isDebugEnabled()) {
- TRANSPORTLOG.debug(this + " failed: " + e, e);
- } else if (TRANSPORTLOG.isWarnEnabled() && !expected(e)) {
- TRANSPORTLOG.warn(this + " failed: " + e);
+ if (! (e instanceof TransportDisposedIOException)) {
+ if (TRANSPORTLOG.isDebugEnabled()) {
+ TRANSPORTLOG.debug(this + " failed: " + e, e);
+ } else if (TRANSPORTLOG.isWarnEnabled() && !expected(e)) {
+ TRANSPORTLOG.warn(this + " failed: " + e);
+ }
}
stopAsync();
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/8cf98a07/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
index 75bd6fe..7b4e1a9 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
@@ -194,15 +194,7 @@ public class VMTransport implements Transport, Task {
}
if (peer.transportListener != null) {
- // let the peer know that we are disconnecting after attempting
- // to cleanly shutdown the async tasks so that this is the last
- // command it see's.
- try {
- peer.transportListener.onCommand(new ShutdownInfo());
- } catch (Exception ignore) {
- }
-
- // let any requests pending a response see an exception
+ // let any requests pending a response see an exception and shutdown
try {
peer.transportListener.onException(new TransportDisposedIOException("peer (" + this + ") stopped."));
} catch (Exception ignore) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/8cf98a07/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java
index 26af596..7ebe11c 100755
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/TestSupport.java
@@ -173,7 +173,7 @@ public abstract class TestSupport extends CombinationTestSupport {
regionBroker.getTopicRegion().getDestinationMap();
}
- public static enum PersistenceAdapterChoice {LevelDB, KahaDB, AMQ, JDBC, MEM };
+ public static enum PersistenceAdapterChoice {LevelDB, KahaDB, JDBC, MEM };
public PersistenceAdapter setDefaultPersistenceAdapter(BrokerService broker) throws IOException {
return setPersistenceAdapter(broker, defaultPersistenceAdapter);
http://git-wip-us.apache.org/repos/asf/activemq/blob/8cf98a07/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2902Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2902Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2902Test.java
index 3c38186..39e0407 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2902Test.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2902Test.java
@@ -75,7 +75,7 @@ public class AMQ2902Test extends TestCase {
public void testNoExceptionOnClose() throws JMSException {
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
- "vm://localhost?broker.persistent=false");
+ "vm://localhostTwo?broker.persistent=false");
Connection connection = connectionFactory.createConnection();
connection.close();
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/8cf98a07/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java
index 8534f89..ea3d833 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportThreadSafeTest.java
@@ -263,31 +263,6 @@ public class VMTransportThreadSafeTest {
}
@Test(timeout=60000)
- public void testStopSendsShutdownToPeer() throws Exception {
-
- final VMTransport local = new VMTransport(new URI(location1));
- final VMTransport remote = new VMTransport(new URI(location2));
-
- local.setPeer(remote);
- remote.setPeer(local);
-
- final VMTestTransportListener remoteListener = new VMTestTransportListener(remoteReceived);
-
- local.setTransportListener(new VMTestTransportListener(localReceived));
- remote.setTransportListener(remoteListener);
-
- local.start();
- local.stop();
-
- assertTrue(Wait.waitFor(new Wait.Condition() {
- @Override
- public boolean isSatisified() throws Exception {
- return remoteListener.shutdownReceived;
- }
- }));
- }
-
- @Test(timeout=60000)
public void testRemoteStopSendsExceptionToPendingRequests() throws Exception {
final VMTransport local = new VMTransport(new URI(location1));