You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2011/08/23 15:34:10 UTC
svn commit: r1160681 - in /activemq/trunk/kahadb/src:
main/java/org/apache/kahadb/page/PageFile.java
main/java/org/apache/kahadb/page/Transaction.java
test/java/org/apache/kahadb/index/BTreeIndexTest.java
Author: gtully
Date: Tue Aug 23 13:34:09 2011
New Revision: 1160681
URL: http://svn.apache.org/viewvc?rev=1160681&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3466: IndexOutOfBounds in kahadb with large number of subscriptions and pending messages. use long locations such that temp file appends do not overflow and ensure page file overflow does not leave oversized chunk, link pages till overflow fits in a page, + some additional tests
Modified:
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java
activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java
Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=1160681&r1=1160680&r2=1160681&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java Tue Aug 23 13:34:09 2011
@@ -140,8 +140,8 @@ public class PageFile {
Page page;
byte[] current;
byte[] diskBound;
- int currentLocation = -1;
- int diskBoundLocation = -1;
+ long currentLocation = -1;
+ long diskBoundLocation = -1;
File tmpFile;
int length;
@@ -150,7 +150,7 @@ public class PageFile {
current=data;
}
- public PageWrite(Page page, int currentLocation, int length, File tmpFile) {
+ public PageWrite(Page page, long currentLocation, int length, File tmpFile) {
this.page = page;
this.currentLocation = currentLocation;
this.tmpFile = tmpFile;
@@ -164,7 +164,7 @@ public class PageFile {
diskBoundLocation = -1;
}
- public void setCurrentLocation(Page page, int location, int length) {
+ public void setCurrentLocation(Page page, long location, int length) {
this.page = page;
this.currentLocation = location;
this.length = length;
@@ -186,7 +186,7 @@ public class PageFile {
diskBound = new byte[length];
RandomAccessFile file = new RandomAccessFile(tmpFile, "r");
file.seek(diskBoundLocation);
- int readNum = file.read(diskBound);
+ file.read(diskBound);
file.close();
diskBoundLocation = -1;
}
Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java?rev=1160681&r1=1160680&r2=1160681&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java Tue Aug 23 13:34:09 2011
@@ -30,7 +30,7 @@ public class Transaction implements Iter
private RandomAccessFile tmpFile;
private File txFile;
- private int nextLocation = 0;
+ private long nextLocation = 0;
/**
* The PageOverflowIOException occurs when a page write is requested
@@ -277,36 +277,38 @@ public class Transaction implements Iter
// If overflow is allowed
if (overflow) {
- Page next;
- if (current.getType() == Page.PAGE_PART_TYPE) {
- next = load(current.getNext(), null);
- } else {
- next = allocate();
- }
-
- next.txId = current.txId;
-
- // Write the page header
- int oldPos = pos;
- pos = 0;
-
- current.makePagePart(next.getPageId(), getWriteTransactionId());
- current.write(this);
-
- // Do the page write..
- byte[] data = new byte[pageSize];
- System.arraycopy(buf, 0, data, 0, pageSize);
- Transaction.this.write(current, data);
-
- // Reset for the next page chunk
- pos = 0;
- // The page header marshalled after the data is written.
- skip(Page.PAGE_HEADER_SIZE);
- // Move the overflow data after the header.
- System.arraycopy(buf, pageSize, buf, pos, oldPos - pageSize);
- pos += oldPos - pageSize;
- current = next;
+ do {
+ Page next;
+ if (current.getType() == Page.PAGE_PART_TYPE) {
+ next = load(current.getNext(), null);
+ } else {
+ next = allocate();
+ }
+
+ next.txId = current.txId;
+
+ // Write the page header
+ int oldPos = pos;
+ pos = 0;
+
+ current.makePagePart(next.getPageId(), getWriteTransactionId());
+ current.write(this);
+
+ // Do the page write..
+ byte[] data = new byte[pageSize];
+ System.arraycopy(buf, 0, data, 0, pageSize);
+ Transaction.this.write(current, data);
+
+ // Reset for the next page chunk
+ pos = 0;
+ // The page header marshalled after the data is written.
+ skip(Page.PAGE_HEADER_SIZE);
+ // Move the overflow data after the header.
+ System.arraycopy(buf, pageSize, buf, pos, oldPos - pageSize);
+ pos += oldPos - pageSize;
+ current = next;
+ } while (pos > pageSize);
} else {
throw new PageOverflowIOException("Page overflow.");
}
@@ -705,7 +707,7 @@ public class Transaction implements Iter
if (tmpFile == null) {
tmpFile = new RandomAccessFile(getTempFile(), "rw");
}
- int location = nextLocation;
+ long location = nextLocation;
tmpFile.seek(nextLocation);
tmpFile.write(data);
nextLocation = location + data.length;
Modified: activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java?rev=1160681&r1=1160680&r2=1160681&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java (original)
+++ activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java Tue Aug 23 13:34:09 2011
@@ -31,6 +31,7 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
+import org.apache.kahadb.page.PageFile;
import org.apache.kahadb.util.LongMarshaller;
import org.apache.kahadb.util.StringMarshaller;
import org.apache.kahadb.util.VariableMarshaller;
@@ -220,8 +221,13 @@ public class BTreeIndexTest extends Inde
index.remove(tx, key(1566));
}
- public void x_testLargeValue() throws Exception {
- createPageFileAndIndex(4*1024);
+ public void testLargeValue() throws Exception {
+ //System.setProperty("maxKahaDBTxSize", "" + (1024*1024*1024));
+ pf = new PageFile(directory, getClass().getName());
+ pf.setPageSize(4*1024);
+ pf.setEnablePageCaching(false);
+ pf.load();
+ tx = pf.tx();
long id = tx.allocate().getPageId();
tx.commit();
@@ -232,9 +238,9 @@ public class BTreeIndexTest extends Inde
tx.commit();
tx = pf.tx();
- String val = new String(new byte[93]);
- final long numMessages = 2000;
- final int numConsumers = 10000;
+ String val = new String(new byte[1024]);
+ final long numMessages = 10;
+ final int numConsumers = 200;
for (long i=0; i<numMessages; i++) {
HashSet<String> hs = new HashSet<String>();
@@ -243,13 +249,57 @@ public class BTreeIndexTest extends Inde
}
test.put(tx, i, hs);
}
+ tx.commit();
+ tx = pf.tx();
+ for (long i=0; i<numMessages; i++) {
+ HashSet<String> hs = new HashSet<String>();
+ for (int j=numConsumers; j<numConsumers*2;j++) {
+ hs.add(val + "SOME TEXT" + j);
+ }
+ test.put(tx, i, hs);
+ }
+ tx.commit();
+ tx = pf.tx();
for (long i=0; i<numMessages; i++) {
test.get(tx, i);
}
tx.commit();
}
+ public void testLargeValueOverflow() throws Exception {
+ pf = new PageFile(directory, getClass().getName());
+ pf.setPageSize(4*1024);
+ pf.setEnablePageCaching(false);
+ pf.setWriteBatchSize(1);
+ pf.load();
+ tx = pf.tx();
+ long id = tx.allocate().getPageId();
+
+ BTreeIndex<Long, String> test = new BTreeIndex<Long, String>(pf, id);
+ test.setKeyMarshaller(LongMarshaller.INSTANCE);
+ test.setValueMarshaller(StringMarshaller.INSTANCE);
+ test.load(tx);
+ tx.commit();
+
+ final int stringSize = 6*1024;
+ tx = pf.tx();
+ String val = new String(new byte[stringSize]);
+ final long numMessages = 1;
+
+ for (long i=0; i<numMessages; i++) {
+ test.put(tx, i, val);
+ }
+ tx.commit();
+
+ tx = pf.tx();
+ for (long i=0; i<numMessages; i++) {
+ String s = test.get(tx, i);
+ assertEquals("len is as expected", stringSize, s.length());
+ }
+ tx.commit();
+ }
+
void doInsertReverse(int count) throws Exception {
for (int i = count-1; i >= 0; i--) {
index.put(tx, key(i), (long)i);