You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2011/08/23 15:34:10 UTC

svn commit: r1160681 - in /activemq/trunk/kahadb/src: main/java/org/apache/kahadb/page/PageFile.java main/java/org/apache/kahadb/page/Transaction.java test/java/org/apache/kahadb/index/BTreeIndexTest.java

Author: gtully
Date: Tue Aug 23 13:34:09 2011
New Revision: 1160681

URL: http://svn.apache.org/viewvc?rev=1160681&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3466: IndexOutOfBounds in kahadb with large number of subscriptions and pending messages. use long locations such that temp file appends do not overflow and ensure page file overflow does not leave oversized chunk, link pages till overflow fits in a page, + some additional tests

Modified:
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java
    activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=1160681&r1=1160680&r2=1160681&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java Tue Aug 23 13:34:09 2011
@@ -140,8 +140,8 @@ public class PageFile {
         Page page;
         byte[] current;
         byte[] diskBound;
-        int currentLocation = -1;
-        int diskBoundLocation = -1;
+        long currentLocation = -1;
+        long diskBoundLocation = -1;
         File tmpFile;
         int length;
 
@@ -150,7 +150,7 @@ public class PageFile {
             current=data;
         }
 
-        public PageWrite(Page page, int currentLocation, int length, File tmpFile) {
+        public PageWrite(Page page, long currentLocation, int length, File tmpFile) {
             this.page = page;
             this.currentLocation = currentLocation;
             this.tmpFile = tmpFile;
@@ -164,7 +164,7 @@ public class PageFile {
             diskBoundLocation = -1;
         }
 
-        public void setCurrentLocation(Page page, int location, int length) {
+        public void setCurrentLocation(Page page, long location, int length) {
             this.page = page;
             this.currentLocation = location;
             this.length = length;
@@ -186,7 +186,7 @@ public class PageFile {
                 diskBound = new byte[length];
                 RandomAccessFile file = new RandomAccessFile(tmpFile, "r");
                 file.seek(diskBoundLocation);
-                int readNum = file.read(diskBound);
+                file.read(diskBound);
                 file.close();
                 diskBoundLocation = -1;
             }

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java?rev=1160681&r1=1160680&r2=1160681&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java Tue Aug 23 13:34:09 2011
@@ -30,7 +30,7 @@ public class Transaction implements Iter
 
     private RandomAccessFile tmpFile;
     private File txFile;
-    private int nextLocation = 0;
+    private long nextLocation = 0;
 
     /**
      * The PageOverflowIOException occurs when a page write is requested
@@ -277,36 +277,38 @@ public class Transaction implements Iter
                     // If overflow is allowed
                     if (overflow) {
 
-                        Page next;
-                        if (current.getType() == Page.PAGE_PART_TYPE) {
-                            next = load(current.getNext(), null);
-                        } else {
-                            next = allocate();
-                        }
-
-                        next.txId = current.txId;
-
-                        // Write the page header
-                        int oldPos = pos;
-                        pos = 0;
-
-                        current.makePagePart(next.getPageId(), getWriteTransactionId());
-                        current.write(this);
-
-                        // Do the page write..
-                        byte[] data = new byte[pageSize];
-                        System.arraycopy(buf, 0, data, 0, pageSize);
-                        Transaction.this.write(current, data);
-
-                        // Reset for the next page chunk
-                        pos = 0;
-                        // The page header marshalled after the data is written.
-                        skip(Page.PAGE_HEADER_SIZE);
-                        // Move the overflow data after the header.
-                        System.arraycopy(buf, pageSize, buf, pos, oldPos - pageSize);
-                        pos += oldPos - pageSize;
-                        current = next;
+                        do {
+                            Page next;
+                            if (current.getType() == Page.PAGE_PART_TYPE) {
+                                next = load(current.getNext(), null);
+                            } else {
+                                next = allocate();
+                            }
+
+                            next.txId = current.txId;
+
+                            // Write the page header
+                            int oldPos = pos;
+                            pos = 0;
+
+                            current.makePagePart(next.getPageId(), getWriteTransactionId());
+                            current.write(this);
+
+                            // Do the page write..
+                            byte[] data = new byte[pageSize];
+                            System.arraycopy(buf, 0, data, 0, pageSize);
+                            Transaction.this.write(current, data);
+
+                            // Reset for the next page chunk
+                            pos = 0;
+                            // The page header marshalled after the data is written.
+                            skip(Page.PAGE_HEADER_SIZE);
+                            // Move the overflow data after the header.
+                            System.arraycopy(buf, pageSize, buf, pos, oldPos - pageSize);
+                            pos += oldPos - pageSize;
+                            current = next;
 
+                        } while (pos > pageSize);
                     } else {
                         throw new PageOverflowIOException("Page overflow.");
                     }
@@ -705,7 +707,7 @@ public class Transaction implements Iter
             if (tmpFile == null) {
                 tmpFile = new RandomAccessFile(getTempFile(), "rw");
             }
-            int location = nextLocation;
+            long location = nextLocation;
             tmpFile.seek(nextLocation);
             tmpFile.write(data);
             nextLocation = location + data.length;

Modified: activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java?rev=1160681&r1=1160680&r2=1160681&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java (original)
+++ activemq/trunk/kahadb/src/test/java/org/apache/kahadb/index/BTreeIndexTest.java Tue Aug 23 13:34:09 2011
@@ -31,6 +31,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
+import org.apache.kahadb.page.PageFile;
 import org.apache.kahadb.util.LongMarshaller;
 import org.apache.kahadb.util.StringMarshaller;
 import org.apache.kahadb.util.VariableMarshaller;
@@ -220,8 +221,13 @@ public class BTreeIndexTest extends Inde
         index.remove(tx, key(1566));
     }
 
-    public void x_testLargeValue() throws Exception {
-        createPageFileAndIndex(4*1024);
+    public void testLargeValue() throws Exception {
+        //System.setProperty("maxKahaDBTxSize", "" + (1024*1024*1024));
+        pf = new PageFile(directory, getClass().getName());
+        pf.setPageSize(4*1024);
+        pf.setEnablePageCaching(false);
+        pf.load();
+        tx = pf.tx();
         long id = tx.allocate().getPageId();
         tx.commit();
 
@@ -232,9 +238,9 @@ public class BTreeIndexTest extends Inde
         tx.commit();
 
         tx =  pf.tx();
-        String val = new String(new byte[93]);
-        final long numMessages = 2000;
-        final int numConsumers = 10000;
+        String val = new String(new byte[1024]);
+        final long numMessages = 10;
+        final int numConsumers = 200;
 
         for (long i=0; i<numMessages; i++) {
             HashSet<String> hs = new HashSet<String>();
@@ -243,13 +249,57 @@ public class BTreeIndexTest extends Inde
             }
             test.put(tx, i, hs);
         }
+        tx.commit();
+        tx =  pf.tx();
+        for (long i=0; i<numMessages; i++) {
+            HashSet<String> hs = new HashSet<String>();
+            for (int j=numConsumers; j<numConsumers*2;j++) {
+                hs.add(val + "SOME TEXT" + j);
+            }
+            test.put(tx, i, hs);
+        }
 
+        tx.commit();
+        tx =  pf.tx();
         for (long i=0; i<numMessages; i++) {
             test.get(tx, i);
         }
         tx.commit();
     }
 
+    public void testLargeValueOverflow() throws Exception {
+        pf = new PageFile(directory, getClass().getName());
+        pf.setPageSize(4*1024);
+        pf.setEnablePageCaching(false);
+        pf.setWriteBatchSize(1);
+        pf.load();
+        tx = pf.tx();
+        long id = tx.allocate().getPageId();
+
+        BTreeIndex<Long, String> test = new BTreeIndex<Long, String>(pf, id);
+        test.setKeyMarshaller(LongMarshaller.INSTANCE);
+        test.setValueMarshaller(StringMarshaller.INSTANCE);
+        test.load(tx);
+        tx.commit();
+
+        final int stringSize = 6*1024;
+        tx =  pf.tx();
+        String val = new String(new byte[stringSize]);
+        final long numMessages = 1;
+
+        for (long i=0; i<numMessages; i++) {
+            test.put(tx, i, val);
+        }
+        tx.commit();
+
+        tx =  pf.tx();
+        for (long i=0; i<numMessages; i++) {
+            String s = test.get(tx, i);
+            assertEquals("len is as expected", stringSize, s.length());
+        }
+        tx.commit();
+    }
+
     void doInsertReverse(int count) throws Exception {
         for (int i = count-1; i >= 0; i--) {
             index.put(tx, key(i), (long)i);