You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2011/06/22 14:55:37 UTC

svn commit: r1138442 - in /activemq/trunk: activemq-core/src/test/java/org/apache/activemq/usecases/ activemq-web/src/main/java/org/apache/activemq/web/ kahadb/src/main/java/org/apache/kahadb/index/ kahadb/src/main/java/org/apache/kahadb/page/

Author: dejanb
Date: Wed Jun 22 12:55:36 2011
New Revision: 1138442

URL: http://svn.apache.org/viewvc?rev=1138442&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3374 - first stab at fixing long kahadb tx oom problem

Modified:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableUnsubscribeTest.java
    activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/WebClient.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/HashIndex.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableUnsubscribeTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableUnsubscribeTest.java?rev=1138442&r1=1138441&r2=1138442&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableUnsubscribeTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/DurableUnsubscribeTest.java Wed Jun 22 12:55:36 2011
@@ -19,6 +19,7 @@ package org.apache.activemq.usecases;
 import java.lang.management.ManagementFactory;
 
 import javax.jms.Connection;
+import javax.jms.MessageProducer;
 import javax.jms.Session;
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
@@ -43,7 +44,15 @@ public class DurableUnsubscribeTest exte
         Destination d = broker.getDestination(topic);
         assertEquals("Subscription is missing.", 1, d.getConsumers().size());
 
+
         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer producer = session.createProducer(topic);
+        for (int i = 0; i < 1000; i++) {
+            producer.send(session.createTextMessage("text"));
+        }
+
+        Thread.sleep(1000);
+
         session.unsubscribe("SubsId");
         session.close();
 
@@ -92,7 +101,7 @@ public class DurableUnsubscribeTest exte
 
     private void createBroker() throws Exception {
         broker = BrokerFactory.createBroker("broker:(vm://localhost)");
-        broker.setPersistent(false);
+        //broker.setPersistent(false);
         broker.setUseJmx(true);
         broker.setBrokerName(getName());
         broker.start();

Modified: activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/WebClient.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/WebClient.java?rev=1138442&r1=1138441&r2=1138442&view=diff
==============================================================================
--- activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/WebClient.java (original)
+++ activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/WebClient.java Wed Jun 22 12:55:36 2011
@@ -61,7 +61,8 @@ import org.slf4j.LoggerFactory;
  * stored inside a HttpSession TODO controls to prevent DOS attacks with users
  * requesting many consumers TODO configure consumers with small prefetch.
  * 
- * 
+ *
+ *
  */
 public class WebClient implements HttpSessionActivationListener, HttpSessionBindingListener, Externalizable {
 

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/HashIndex.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/HashIndex.java?rev=1138442&r1=1138441&r2=1138442&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/HashIndex.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/index/HashIndex.java Wed Jun 22 12:55:36 2011
@@ -345,7 +345,7 @@ public class HashIndex<Key,Value> implem
         tx.store(metadata.page, metadataMarshaller, true);
         calcThresholds();
 
-        LOG.debug("Resizing done.  New bins start at: "+metadata.binPageId);        
+        LOG.debug("Resizing done.  New bins start at: "+metadata.binPageId);
         resizeCapacity=0;
         resizePageId=0;
     }

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=1138442&r1=1138441&r2=1138442&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java Wed Jun 22 12:55:36 2011
@@ -138,15 +138,35 @@ public class PageFile {
         Page page;
         byte[] current;
         byte[] diskBound;
+        int currentLocation = -1;
+        int diskBoundLocation = -1;
+        File tmpFile;
+        int length;
 
         public PageWrite(Page page, byte[] data) {
             this.page=page;
             current=data;
         }
+
+        public PageWrite(Page page, int currentLocation, int length, File tmpFile) {
+            this.page = page;
+            this.currentLocation = currentLocation;
+            this.tmpFile = tmpFile;
+            this.length = length;
+        }
                 
         public void setCurrent(Page page, byte[] data) {
             this.page=page;
             current=data;
+            currentLocation = -1;
+            diskBoundLocation = -1;
+        }
+
+        public void setCurrentLocation(Page page, int location, int length) {
+            this.page = page;
+            this.currentLocation = location;
+            this.length = length;
+            this.current = null;
         }
 
         @Override
@@ -158,22 +178,42 @@ public class PageFile {
         public Page getPage() {
             return page;
         }
+
+        public byte[] getDiskBound() throws IOException {
+            if (diskBound == null && diskBoundLocation != -1) {
+                diskBound = new byte[length];
+                RandomAccessFile file = new RandomAccessFile(tmpFile, "r");
+                file.seek(diskBoundLocation);
+                int readNum = file.read(diskBound);
+                file.close();
+                diskBoundLocation = -1;
+            }
+            return diskBound;
+        }
         
         void begin() {
-           diskBound = current;
-           current = null;
+           if (currentLocation != -1) {
+              diskBoundLocation = currentLocation;
+              currentLocation = -1;
+              current = null;
+           }  else {
+              diskBound = current;
+              current = null;
+              currentLocation = -1;
+           }
         }
         
         /**
          * @return true if there is no pending writes to do.
          */
         boolean done() {
+            diskBoundLocation = -1;
             diskBound=null;
-            return current == null;
+            return current == null || currentLocation == -1;
         }
         
         boolean isDone() {
-            return diskBound == null && current == null;
+            return diskBound == null && diskBoundLocation == -1 && current == null && currentLocation == -1;
         }
 
     }
@@ -470,7 +510,7 @@ public class PageFile {
         return new File(directory, IOHelper.toFileSystemSafeName(name)+RECOVERY_FILE_SUFFIX);
     } 
 
-    private long toOffset(long pageId) {
+    public long toOffset(long pageId) {
         return PAGE_FILE_HEADER_SIZE+(pageId*pageSize);
     }
 
@@ -823,6 +863,8 @@ public class PageFile {
                 }
             }
 
+            boolean longTx = false;
+
             for (Map.Entry<Long, PageWrite> entry : updates) {
                 Long key = entry.getKey();
                 PageWrite value = entry.getValue();
@@ -830,12 +872,20 @@ public class PageFile {
                 if( write==null ) {
                     writes.put(key, value);
                 } else {
-                    write.setCurrent(value.page, value.current);
+                    if (value.currentLocation != -1) {
+                        write.setCurrentLocation(value.page, value.currentLocation, value.length);
+                        write.tmpFile = value.tmpFile;
+                        longTx = true;
+                    } else {
+                        write.setCurrent(value.page, value.current);
+                    }
                 }
             }
             
             // Once we start approaching capacity, notify the writer to start writing
-            if( canStartWriteBatch() ) {
+            // sync immediately for long txs
+            if( longTx || canStartWriteBatch() ) {
+
                 if( enabledWriteThread  ) {
                     writes.notify();
                 } else {
@@ -919,115 +969,90 @@ public class PageFile {
         }
     }
 
-    /**
-     * 
-     * @return true if there are still pending writes to do.
-     * @throws InterruptedException 
-     * @throws IOException 
-     */
-    private void writeBatch() throws IOException {
-            
-        CountDownLatch checkpointLatch;
-        ArrayList<PageWrite> batch;
-        synchronized( writes ) {
+     private void writeBatch() throws IOException {
+
+         CountDownLatch checkpointLatch;
+         ArrayList<PageWrite> batch;
+         synchronized( writes ) {
             // If there is not enough to write, wait for a notification...
 
             batch = new ArrayList<PageWrite>(writes.size());
             // build a write batch from the current write cache.
             for (PageWrite write : writes.values()) {
                 batch.add(write);
-                // Move the current write to the diskBound write, this lets folks update the 
+                // Move the current write to the diskBound write, this lets folks update the
                 // page again without blocking for this write.
                 write.begin();
-                if (write.diskBound == null) {
+                if (write.diskBound == null && write.diskBoundLocation == -1) {
                     batch.remove(write);
                 }
             }
 
-            // Grab on to the existing checkpoint latch cause once we do this write we can 
+            // Grab on to the existing checkpoint latch cause once we do this write we can
             // release the folks that were waiting for those writes to hit disk.
             checkpointLatch = this.checkpointLatch;
             this.checkpointLatch=null;
-        }
-        
-       try {
-            if (enableRecoveryFile) {
+         }
 
-                // Using Adler-32 instead of CRC-32 because it's much faster and
-                // it's
-                // weakness for short messages with few hundred bytes is not a
-                // factor in this case since we know
-                // our write batches are going to much larger.
-                Checksum checksum = new Adler32();
-                for (PageWrite w : batch) {
-                    try {
-                        checksum.update(w.diskBound, 0, pageSize);
-                    } catch (Throwable t) {
-                        throw IOExceptionSupport.create(
-                                "Cannot create recovery file. Reason: " + t, t);
-                    }
-                }
-
-                // Can we shrink the recovery buffer??
-                if (recoveryPageCount > recoveryFileMaxPageCount) {
-                    int t = Math.max(recoveryFileMinPageCount, batch.size());
-                    recoveryFile.setLength(recoveryFileSizeForPages(t));
-                }
-
-                // Record the page writes in the recovery buffer.
-                recoveryFile.seek(0);
-                // Store the next tx id...
-                recoveryFile.writeLong(nextTxid.get());
-                // Store the checksum for thw write batch so that on recovery we
-                // know if we have a consistent
-                // write batch on disk.
-                recoveryFile.writeLong(checksum.getValue());
-                // Write the # of pages that will follow
-                recoveryFile.writeInt(batch.size());
-
-                // Write the pages.
-                recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
-
-                for (PageWrite w : batch) {
-                    recoveryFile.writeLong(w.page.getPageId());
-                    recoveryFile.write(w.diskBound, 0, pageSize);
-                }
-
-                if (enableDiskSyncs) {
-                    // Sync to make sure recovery buffer writes land on disk..
-                    recoveryFile.getFD().sync();
-                }
-
-                recoveryPageCount = batch.size();
-            }
-
-            for (PageWrite w : batch) {
-                writeFile.seek(toOffset(w.page.getPageId()));
-                writeFile.write(w.diskBound, 0, pageSize);
-                w.done();
-            }
-
-            // Sync again
-            if (enableDiskSyncs) {
-                writeFile.getFD().sync();
-            }
-
-        } finally {
-            synchronized (writes) {
-                for (PageWrite w : batch) {
-                    // If there are no more pending writes, then remove it from
-                    // the write cache.
-                    if (w.isDone()) {
-                        writes.remove(w.page.getPageId());
-                    }
-                }
-            }
-            
-            if( checkpointLatch!=null ) {
-                checkpointLatch.countDown();
-            }
-        }
-    }
+         Checksum checksum = new Adler32();
+         recoveryFile.seek(RECOVERY_FILE_HEADER_SIZE);
+         for (PageWrite w : batch) {
+             if (enableRecoveryFile) {
+                 try {
+                     checksum.update(w.getDiskBound(), 0, pageSize);
+                 } catch (Throwable t) {
+                     throw IOExceptionSupport.create("Cannot create recovery file. Reason: " + t, t);
+                 }
+                 recoveryFile.writeLong(w.page.getPageId());
+                 recoveryFile.write(w.getDiskBound(), 0, pageSize);
+             }
+
+             writeFile.seek(toOffset(w.page.getPageId()));
+             writeFile.write(w.getDiskBound(), 0, pageSize);
+             w.done();
+         }
+
+         try {
+             if (enableRecoveryFile) {
+                 // Can we shrink the recovery buffer??
+                 if (recoveryPageCount > recoveryFileMaxPageCount) {
+                     int t = Math.max(recoveryFileMinPageCount, batch.size());
+                     recoveryFile.setLength(recoveryFileSizeForPages(t));
+                 }
+
+                 // Record the page writes in the recovery buffer.
+                 recoveryFile.seek(0);
+                 // Store the next tx id...
+                 recoveryFile.writeLong(nextTxid.get());
+                 // Store the checksum for thw write batch so that on recovery we
+                 // know if we have a consistent
+                 // write batch on disk.
+                 recoveryFile.writeLong(checksum.getValue());
+                 // Write the # of pages that will follow
+                 recoveryFile.writeInt(batch.size());
+             }
+
+             if (enableDiskSyncs) {
+                 // Sync to make sure recovery buffer writes land on disk..
+                 recoveryFile.getFD().sync();
+                 writeFile.getFD().sync();
+             }
+         } finally {
+             synchronized (writes) {
+                 for (PageWrite w : batch) {
+                     // If there are no more pending writes, then remove it from
+                     // the write cache.
+                     if (w.isDone()) {
+                         writes.remove(w.page.getPageId());
+                     }
+                 }
+             }
+
+             if (checkpointLatch != null) {
+                 checkpointLatch.countDown();
+             }
+         }
+     }
 
     private long recoveryFileSizeForPages(int pageCount) {
         return RECOVERY_FILE_HEADER_SIZE+((pageSize+8)*pageCount);
@@ -1135,4 +1160,7 @@ public class PageFile {
 		return getMainPageFile();
 	}
 
+    public File getDirectory() {
+        return directory;
+    }
 }

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java?rev=1138442&r1=1138441&r2=1138442&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/page/Transaction.java Wed Jun 22 12:55:36 2011
@@ -16,22 +16,11 @@
  */
 package org.apache.kahadb.page;
 
-import java.io.DataInputStream;
-import java.io.EOFException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
+import java.io.*;
+import java.util.*;
 
 import org.apache.kahadb.page.PageFile.PageWrite;
-import org.apache.kahadb.util.ByteSequence;
-import org.apache.kahadb.util.DataByteArrayInputStream;
-import org.apache.kahadb.util.DataByteArrayOutputStream;
-import org.apache.kahadb.util.Marshaller;
-import org.apache.kahadb.util.Sequence;
-import org.apache.kahadb.util.SequenceSet;
+import org.apache.kahadb.util.*;
 
 /**
  * The class used to read/update a PageFile object.  Using a transaction allows you to
@@ -39,6 +28,11 @@ import org.apache.kahadb.util.SequenceSe
  */
 public class Transaction implements Iterable<Page> {
 
+
+    private RandomAccessFile tmpFile;
+    private File txfFile;
+    private int nextLocation = 0;
+
     /**
      * The PageOverflowIOException occurs when a page write is requested
      * and it's data is larger than what would fit into a single page.
@@ -91,12 +85,16 @@ public class Transaction implements Iter
     // If this transaction is updating stuff.. this is the tx of 
     private long writeTransactionId=-1;
     // List of pages that this transaction has modified.
-    private HashMap<Long, PageWrite> writes=new HashMap<Long, PageWrite>();
+    private TreeMap<Long, PageWrite> writes=new TreeMap<Long, PageWrite>();
     // List of pages allocated in this transaction
     private final SequenceSet allocateList = new SequenceSet();
     // List of pages freed in this transaction
     private final SequenceSet freeList = new SequenceSet();
 
+    private long maxTransactionSize = 10485760;
+
+    private long size = 0;
+
     Transaction(PageFile pageFile) {
         this.pageFile = pageFile;
     }
@@ -650,7 +648,16 @@ public class Transaction implements Iter
             allocateList.clear();
             writes.clear();
             writeTransactionId = -1;
+            if (tmpFile != null) {
+                tmpFile.close();
+                if (!getTempFile().delete()) {
+                    throw new IOException("Can't delete temporary KahaDB transaction file:"  + getTempFile());
+                }
+                tmpFile = null;
+                txfFile = null;
+            }
         }
+        size = 0;
     }
 
     /**
@@ -665,7 +672,16 @@ public class Transaction implements Iter
             allocateList.clear();
             writes.clear();
             writeTransactionId = -1;
+            if (tmpFile != null) {
+                tmpFile.close();
+                if (getTempFile().delete()) {
+                    throw new IOException("Can't delete temporary KahaDB transaction file:"  + getTempFile());
+                }
+                tmpFile = null;
+                txfFile = null;
+            }
         }
+        size = 0;
     }
 
     private long getWriteTransactionId() {
@@ -675,16 +691,36 @@ public class Transaction implements Iter
         return writeTransactionId;
     }
 
+
+    protected File getTempFile() {
+        if (txfFile == null) {
+            txfFile = new File(getPageFile().getDirectory(), IOHelper.toFileSystemSafeName(Long.toString(getWriteTransactionId())) + ".tmp");
+        }
+       return txfFile;
+    }
+
     /**
      * Queues up a page write that should get done when commit() gets called.
      */
     @SuppressWarnings("unchecked")
     private void write(final Page page, byte[] data) throws IOException {
         Long key = page.getPageId();
-        // TODO: if a large update transaction is in progress, we may want to move
-        // all the current updates to a temp file so that we don't keep using 
-        // up memory.
-        writes.put(key, new PageWrite(page, data));        
+        size += data.length;
+
+        PageWrite write;
+        if (size > maxTransactionSize) {
+            if (tmpFile == null) {
+                tmpFile = new RandomAccessFile(getTempFile(), "rw");
+            }
+            int location = nextLocation;
+            tmpFile.seek(nextLocation);
+            tmpFile.write(data);
+            nextLocation = location + data.length;
+            write = new PageWrite(page, location, data.length, getTempFile());
+        } else {
+            write = new PageWrite(page, data);
+        }
+        writes.put(key, write);
     }   
 
     /**