You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2008/11/20 21:12:12 UTC

svn commit: r719350 - in /activemq/sandbox/kahadb/src: main/java/org/apache/kahadb/journal/ main/java/org/apache/kahadb/page/ main/java/org/apache/kahadb/replication/ main/java/org/apache/kahadb/store/ main/java/org/apache/kahadb/util/ test/java/org/ap...

Author: chirino
Date: Thu Nov 20 12:12:12 2008
New Revision: 719350

URL: http://svn.apache.org/viewvc?rev=719350&view=rev
Log:
- simplified the file names used for the kahadb
- Got rid of the ControlFile since recovering the last journal file is quick anyways.
- Moved the locking logic out of the ControlFile into a new LockFile class.

Added:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/LockFile.java
Removed:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/ControlFile.java
Modified:
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Location.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationService.java
    activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreRecoveryBrokerTest.java
    activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaBulkLoadingTest.java

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java?rev=719350&r1=719349&r2=719350&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Journal.java Thu Nov 20 12:12:12 2008
@@ -71,9 +71,10 @@
 
     public static final byte DATA_ITEM_TYPE = 1;
     public static final byte REDO_ITEM_TYPE = 2;
-    public static final String DEFAULT_DIRECTORY = "data";
+    public static final String DEFAULT_DIRECTORY = ".";
     public static final String DEFAULT_ARCHIVE_DIRECTORY = "data-archive";
-    public static final String DEFAULT_FILE_PREFIX = "data-";
+    public static final String DEFAULT_FILE_PREFIX = "db-";
+    public static final String DEFAULT_FILE_SUFFIX = ".log";
     public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
     public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30;
     public static final int PREFERED_DIFF = 1024 * 512;
@@ -85,7 +86,7 @@
     protected File directory = new File(DEFAULT_DIRECTORY);
     protected File directoryArchive = new File(DEFAULT_ARCHIVE_DIRECTORY);
     protected String filePrefix = DEFAULT_FILE_PREFIX;
-    protected ControlFile controlFile;
+    protected String fileSuffix = DEFAULT_FILE_SUFFIX;
     protected boolean started;
     protected boolean useNio = true;
 
@@ -99,7 +100,6 @@
     protected Map<File, DataFile> fileByFileMap = new LinkedHashMap<File, DataFile>();
     protected LinkedNodeList<DataFile> dataFiles = new LinkedNodeList<DataFile>();
 
-    protected Location mark;
     protected final AtomicReference<Location> lastAppendLocation = new AtomicReference<Location>();
     protected Runnable cleanupTask;
     protected final AtomicLong totalLength = new AtomicLong();
@@ -111,16 +111,12 @@
         if (started) {
             return;
         }
-
+        
+        long start = System.currentTimeMillis();
         accessorPool = new DataFileAccessorPool(this);
         started = true;
         preferedFileLength = Math.max(PREFERED_DIFF, getMaxFileLength() - PREFERED_DIFF);
-        lock();
 
-        ByteSequence sequence = controlFile.load();
-        if (sequence != null && sequence.getLength() > 0) {
-            unmarshallState(sequence);
-        }
         if (useNio) {
             appender = new NIODataFileAppender(this);
         } else {
@@ -129,7 +125,7 @@
 
         File[] files = directory.listFiles(new FilenameFilter() {
             public boolean accept(File dir, String n) {
-                return dir.equals(directory) && n.startsWith(filePrefix);
+                return dir.equals(directory) && n.startsWith(filePrefix) && n.endsWith(fileSuffix);
             }
         });
 
@@ -138,7 +134,7 @@
                 try {
                     File file = files[i];
                     String n = file.getName();
-                    String numStr = n.substring(filePrefix.length(), n.length());
+                    String numStr = n.substring(filePrefix.length(), n.length()-fileSuffix.length());
                     int num = Integer.parseInt(numStr);
                     DataFile dataFile = new DataFile(file, num, preferedFileLength);
                     fileMap.put(dataFile.getDataFileId(), dataFile);
@@ -178,27 +174,14 @@
             }
         }
 
-        ByteSequence storedState = storeState(true);
-        if( dataFiles.isEmpty() ) {
-          appender.storeItem(storedState, Location.MARK_TYPE, true);
-        }
-        
         cleanupTask = new Runnable() {
             public void run() {
                 cleanup();
             }
         };
         Scheduler.executePeriodically(cleanupTask, DEFAULT_CLEANUP_INTERVAL);
-    }
-
-    public void lock() throws IOException {
-        synchronized (this) {
-            if (controlFile == null) {
-                IOHelper.mkdirs(directory);
-                controlFile = new ControlFile(new File(directory, filePrefix + "control"), CONTROL_RECORD_MAX_LENGTH);
-            }
-            controlFile.lock();
-        }
+        long end = System.currentTimeMillis();
+        LOG.trace("Startup took: "+(end-start)+" ms");
     }
 
     protected Location recoveryCheck(DataFile dataFile, Location location) throws IOException {
@@ -219,46 +202,6 @@
         return location;
     }
 
-    protected void unmarshallState(ByteSequence sequence) throws IOException {
-        ByteArrayInputStream bais = new ByteArrayInputStream(sequence.getData(), sequence.getOffset(), sequence.getLength());
-        DataInputStream dis = new DataInputStream(bais);
-        if (dis.readBoolean()) {
-            mark = new Location();
-            mark.readExternal(dis);
-        } else {
-            mark = null;
-        }
-        if (dis.readBoolean()) {
-            Location l = new Location();
-            l.readExternal(dis);
-            lastAppendLocation.set(l);
-        } else {
-            lastAppendLocation.set(null);
-        }
-    }
-
-    private synchronized ByteSequence marshallState() throws IOException {
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        DataOutputStream dos = new DataOutputStream(baos);
-
-        if (mark != null) {
-            dos.writeBoolean(true);
-            mark.writeExternal(dos);
-        } else {
-            dos.writeBoolean(false);
-        }
-        Location l = lastAppendLocation.get();
-        if (l != null) {
-            dos.writeBoolean(true);
-            l.writeExternal(dos);
-        } else {
-            dos.writeBoolean(false);
-        }
-
-        byte[] bs = baos.toByteArray();
-        return new ByteSequence(bs, 0, bs.length);
-    }
-
     synchronized DataFile allocateLocation(Location location) throws IOException {
         if (dataFiles.isEmpty()|| ((dataFiles.getTail().getLength() + location.getSize()) > maxFileLength)) {
             int nextNum = !dataFiles.isEmpty() ? dataFiles.getTail().getDataFileId().intValue() + 1 : 1;
@@ -280,7 +223,7 @@
     }
 
 	public File getFile(int nextNum) {
-		String fileName = filePrefix + nextNum;
+		String fileName = filePrefix + nextNum + fileSuffix;
 		File file = new File(directory, fileName);
 		return file;
 	}
@@ -290,7 +233,7 @@
         DataFile dataFile = fileMap.get(key);
         if (dataFile == null) {
             LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
-            throw new IOException("Could not locate data file " + filePrefix + item.getDataFileId());
+            throw new IOException("Could not locate data file " + getFile(item.getDataFileId()));
         }
         return dataFile;
     }
@@ -300,7 +243,7 @@
         DataFile dataFile = fileMap.get(key);
         if (dataFile == null) {
             LOG.error("Looking for key " + key + " but not found in fileMap: " + fileMap);
-            throw new IOException("Could not locate data file " + filePrefix + item.getDataFileId());
+            throw new IOException("Could not locate data file " + getFile(item.getDataFileId()));
         }
         return dataFile.getFile();
     }
@@ -315,13 +258,9 @@
         }
         Scheduler.cancel(cleanupTask);
         accessorPool.close();
-        storeState(false);
         appender.close();
         fileMap.clear();
         fileByFileMap.clear();
-        controlFile.unlock();
-        controlFile.dispose();
-        controlFile=null;
         dataFiles.clear();
         lastAppendLocation.set(null);
         started = false;
@@ -348,7 +287,6 @@
         fileMap.clear();
         fileByFileMap.clear();
         lastAppendLocation.set(null);
-        mark = null;
         dataFiles = new LinkedNodeList<DataFile>();
 
         // reopen open file handles...
@@ -416,10 +354,6 @@
         return directory.toString();
     }
 
-    public synchronized Location getMark() throws IllegalStateException {
-        return mark;
-    }
-
 	public synchronized void appendedExternally(Location loc, int length) throws IOException {
 		DataFile dataFile = null;
 		if( dataFiles.getTail().getDataFileId() == loc.getDataFileId() ) {
@@ -564,19 +498,6 @@
         return rc;
     }
 
-    public void setMark(Location location, boolean sync) throws IOException, IllegalStateException {
-        synchronized (this) {
-            mark = location;
-        }
-        storeState(sync);
-    }
-
-    protected synchronized ByteSequence storeState(boolean sync) throws IOException {
-        ByteSequence state = marshallState();
-        controlFile.store(state, sync);
-        return state;
-    }
-
     public synchronized Location write(ByteSequence data, boolean sync) throws IOException, IllegalStateException {
         Location loc = appender.storeItem(data, Location.USER_TYPE, sync);
         return loc;
@@ -697,5 +618,13 @@
 		return replicationTarget;
 	}
 
+    public String getFileSuffix() {
+        return fileSuffix;
+    }
+
+    public void setFileSuffix(String fileSuffix) {
+        this.fileSuffix = fileSuffix;
+    }
+
 
 }

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Location.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Location.java?rev=719350&r1=719349&r2=719350&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Location.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/journal/Location.java Thu Nov 20 12:12:12 2008
@@ -28,7 +28,6 @@
  */
 public final class Location implements Comparable<Location> {
 
-    public static final byte MARK_TYPE = -1;
     public static final byte USER_TYPE = 1;
     public static final byte NOT_SET_TYPE = 0;
     public static final int NOT_SET = -1;

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java?rev=719350&r1=719349&r2=719350&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/page/PageFile.java Thu Nov 20 12:12:12 2008
@@ -26,6 +26,8 @@
 import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.io.RandomAccessFile;
+import java.nio.channels.FileLock;
+import java.nio.channels.OverlappingFileLockException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -46,6 +48,7 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.kahadb.util.DataByteArrayOutputStream;
+import org.apache.kahadb.util.IOExceptionSupport;
 import org.apache.kahadb.util.IOHelper;
 import org.apache.kahadb.util.IntrospectionSupport;
 import org.apache.kahadb.util.LRUCache;
@@ -65,6 +68,10 @@
  */
 public class PageFile {
     
+    private static final String PAGEFILE_SUFFIX = ".data";
+    private static final String RECOVERY_FILE_SUFFIX = ".redo";
+    private static final String FREE_FILE_SUFFIX = ".free";
+    
     // 4k Default page size.
     public static final int DEFAULT_PAGE_SIZE = Integer.parseInt(System.getProperty("defaultPageSize", ""+1024*4)); 
     private static final int RECOVERY_FILE_HEADER_SIZE=1024*4;
@@ -454,15 +461,15 @@
     // Private Implementation Methods
     ///////////////////////////////////////////////////////////////////
     private File getMainPageFile() {
-        return new File(directory, IOHelper.toFileSystemSafeName(name)+".dat");
+        return new File(directory, IOHelper.toFileSystemSafeName(name)+PAGEFILE_SUFFIX);
     }
     
     public File getFreeFile() {
-        return new File(directory, IOHelper.toFileSystemSafeName(name)+".fre");
+        return new File(directory, IOHelper.toFileSystemSafeName(name)+FREE_FILE_SUFFIX);
     } 
 
     public File getRecoveryFile() {
-        return new File(directory, IOHelper.toFileSystemSafeName(name)+".rec");
+        return new File(directory, IOHelper.toFileSystemSafeName(name)+RECOVERY_FILE_SUFFIX);
     } 
 
     private long toOffset(long pageId) {

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java?rev=719350&r1=719349&r2=719350&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationMaster.java Thu Nov 20 12:12:12 2008
@@ -242,7 +242,9 @@
 
 					int snapshotId = nextSnapshotId.incrementAndGet();
 					File file = store.getPageFile().getFile();
-					snapshotFile = new File(file.getParentFile(), "snapshot-" + snapshotId);
+					File dir = replicationServer.getTempReplicationDir();
+					dir.mkdirs();
+					snapshotFile = new File(dir, "snapshot-" + snapshotId);
 					
 					journalReplicatedFiles = new HashSet<Integer>();
 					

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationService.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationService.java?rev=719350&r1=719349&r2=719350&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationService.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/replication/ReplicationService.java Thu Nov 20 12:12:12 2008
@@ -38,6 +38,8 @@
  */
 public class ReplicationService implements Service, ClusterListener {
 
+    private static final String JOURNAL_PREFIX = "journal-";
+
     private static final Log LOG = LogFactory.getLog(ReplicationService.class);
 
     private String brokerURI = "xbean:broker.xml";
@@ -192,10 +194,10 @@
         if (fn.equals("database")) {
             return getStore().getPageFile().getFile();
         }
-        if (fn.startsWith("journal-")) {
+        if (fn.startsWith(JOURNAL_PREFIX)) {
             int id;
             try {
-                id = Integer.parseInt(fn.substring("journal-".length()));
+                id = Integer.parseInt(fn.substring(JOURNAL_PREFIX.length()));
             } catch (NumberFormatException e) {
                 throw new IOException("Unknown replication file name: " + fn);
             }
@@ -210,10 +212,10 @@
         if (fn.equals("database")) {
             return new File(getTempReplicationDir(), "database-" + snapshotId);
         }
-        if (fn.startsWith("journal-")) {
+        if (fn.startsWith(JOURNAL_PREFIX)) {
             int id;
             try {
-                id = Integer.parseInt(fn.substring("journal-".length()));
+                id = Integer.parseInt(fn.substring(JOURNAL_PREFIX.length()));
             } catch (NumberFormatException e) {
                 throw new IOException("Unknown replication file name: " + fn);
             }

Modified: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java?rev=719350&r1=719349&r2=719350&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java (original)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/store/MessageDatabase.java Thu Nov 20 12:12:12 2008
@@ -48,7 +48,6 @@
 import org.apache.kahadb.page.Page;
 import org.apache.kahadb.page.PageFile;
 import org.apache.kahadb.page.Transaction;
-import org.apache.kahadb.replication.pb.PBJournalLocation;
 import org.apache.kahadb.store.data.KahaAddMessageCommand;
 import org.apache.kahadb.store.data.KahaCommitCommand;
 import org.apache.kahadb.store.data.KahaDestination;
@@ -65,6 +64,7 @@
 import org.apache.kahadb.util.ByteSequence;
 import org.apache.kahadb.util.DataByteArrayInputStream;
 import org.apache.kahadb.util.DataByteArrayOutputStream;
+import org.apache.kahadb.util.LockFile;
 import org.apache.kahadb.util.LongMarshaller;
 import org.apache.kahadb.util.Marshaller;
 import org.apache.kahadb.util.StringMarshaller;
@@ -72,7 +72,7 @@
 public class MessageDatabase {
 
     private static final Log LOG = LogFactory.getLog(MessageDatabase.class);
-    private static final int JOURNAL_LOCKED_WAIT_DELAY = 10 * 1000;
+    private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
 
     public static final int CLOSED_STATE = 1;
     public static final int OPEN_STATE = 2;
@@ -141,7 +141,7 @@
 
     protected MetadataMarshaller metadataMarshaller = new MetadataMarshaller();
 
-    protected boolean failIfJournalIsLocked;
+    protected boolean failIfDatabaseIsLocked;
 
     protected boolean deleteAllMessages;
     protected File directory;
@@ -152,6 +152,7 @@
     
     protected AtomicBoolean started = new AtomicBoolean();
     protected AtomicBoolean opened = new AtomicBoolean();
+    private LockFile lockFile;
 
     public MessageDatabase() {
     }
@@ -170,6 +171,7 @@
 
 	private void loadPageFile() throws IOException {
 		synchronized (indexMutex) {
+		    final PageFile pageFile = getPageFile();
             pageFile.load();
             pageFile.tx().execute(new Transaction.Closure<IOException>() {
                 public void execute(Transaction tx) throws IOException {
@@ -210,27 +212,32 @@
         }
 	}
 	
+	/**
+	 * @throws IOException
+	 */
 	public void open() throws IOException {
 		if( opened.compareAndSet(false, true) ) {
-	        getJournal();
-	        if (failIfJournalIsLocked) {
-	            journal.lock();
+            File lockFileName = new File(directory, "lock");
+            lockFile = new LockFile(lockFileName, true);
+	        if (failIfDatabaseIsLocked) {
+	            lockFile.lock();
 	        } else {
 	            while (true) {
 	                try {
-	                    journal.lock();
+	                    lockFile.lock();
 	                    break;
 	                } catch (IOException e) {
-	                    LOG.info("Journal is locked... waiting " + (JOURNAL_LOCKED_WAIT_DELAY / 1000) + " seconds for the journal to be unlocked.");
+	                    LOG.info("Database "+lockFileName+" is locked... waiting " + (DATABASE_LOCKED_WAIT_DELAY / 1000) + " seconds for the database to be unlocked.");
 	                    try {
-	                        Thread.sleep(JOURNAL_LOCKED_WAIT_DELAY);
+	                        Thread.sleep(DATABASE_LOCKED_WAIT_DELAY);
 	                    } catch (InterruptedException e1) {
 	                    }
 	                }
 	            }
-	        }        
-	        getPageFile();
-	        journal.start();
+	        }
+	        
+            getJournal().start();
+            
 	        loadPageFile();
 	        
 	        checkpointThread = new Thread("ActiveMQ Journal Checkpoint Worker") {
@@ -296,6 +303,8 @@
 	        }
 	        journal.close();
 	        checkpointThread.join();
+	        lockFile.unlock();
+	        lockFile=null;
 		}
 	}
 	
@@ -1158,13 +1167,13 @@
     // /////////////////////////////////////////////////////////////////
 
     private PageFile createPageFile() {
-        PageFile pf = new PageFile(directory, "database");
+        PageFile pf = new PageFile(directory, "db");
         return pf;
     }
 
     private Journal createJournal() {
         Journal manager = new Journal();
-        manager.setDirectory(new File(directory, "journal"));
+        manager.setDirectory(directory);
         manager.setMaxFileLength(1024 * 1024 * 20);
         manager.setUseNio(false);
         return manager;
@@ -1223,4 +1232,12 @@
         }
 		return journal;
 	}
+
+    public boolean isFailIfDatabaseIsLocked() {
+        return failIfDatabaseIsLocked;
+    }
+
+    public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) {
+        this.failIfDatabaseIsLocked = failIfDatabaseIsLocked;
+    }
 }

Added: activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/LockFile.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/LockFile.java?rev=719350&view=auto
==============================================================================
--- activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/LockFile.java (added)
+++ activemq/sandbox/kahadb/src/main/java/org/apache/kahadb/util/LockFile.java Thu Nov 20 12:12:12 2008
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kahadb.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileLock;
+import java.nio.channels.OverlappingFileLockException;
+
+/**
+ * Used to lock a File.
+ * 
+ * @author chirino
+ */
+public class LockFile {
+    
+    private static final boolean DISABLE_FILE_LOCK = "true".equals(System.getProperty("java.nio.channels.FileLock.broken", "false"));
+    final private File file;
+    
+    private FileLock lock;
+    private RandomAccessFile readFile;
+    private int lockCounter;
+    private final boolean deleteOnUnlock;
+    
+    public LockFile(File file, boolean deleteOnUnlock) {
+        this.file = file;
+        this.deleteOnUnlock = deleteOnUnlock;
+    }
+
+    /**
+     * @throws IOException
+     */
+    synchronized public void lock() throws IOException {
+        if (DISABLE_FILE_LOCK) {
+            return;
+        }
+
+        lockCounter++;
+        if( lockCounter!=1 ) {
+            return;
+        }
+        
+        IOHelper.mkdirs(file.getParentFile());
+        readFile = new RandomAccessFile(file, "rw");        
+        if (lock == null) {
+            try {
+                lock = readFile.getChannel().tryLock();
+            } catch (OverlappingFileLockException e) {
+                throw IOExceptionSupport.create("File '" + file + "' could not be locked.",e);
+            }
+            if (lock == null) {
+                throw new IOException("File '" + file + "' could not be locked.");
+            }
+        }
+    }
+
+    /**
+     */
+    public void unlock() {
+        if (DISABLE_FILE_LOCK) {
+            return;
+        }
+        
+        lockCounter--;
+        if( lockCounter!=0 ) {
+            return;
+        }
+        
+        // release the lock..
+        if (lock != null) {
+            try {
+                lock.release();
+            } catch (Throwable ignore) {
+            }
+            lock = null;
+        }
+        // close the file.
+        if (readFile != null) {
+            try {
+                readFile.close();
+            } catch (Throwable ignore) {
+            }
+            readFile = null;
+        }
+        
+        if( deleteOnUnlock ) {
+            file.delete();
+        }
+    }
+
+}

Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreRecoveryBrokerTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreRecoveryBrokerTest.java?rev=719350&r1=719349&r2=719350&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreRecoveryBrokerTest.java (original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/KahaDBStoreRecoveryBrokerTest.java Thu Nov 20 12:12:12 2008
@@ -17,12 +17,10 @@
 package org.apache.kahadb.store;
 
 import java.io.File;
-import java.net.URI;
 import java.util.ArrayList;
 
 import junit.framework.Test;
 
-import org.apache.activemq.broker.BrokerFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.RecoveryBrokerTest;
 import org.apache.activemq.broker.StubConnection;

Modified: activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaBulkLoadingTest.java
URL: http://svn.apache.org/viewvc/activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaBulkLoadingTest.java?rev=719350&r1=719349&r2=719350&view=diff
==============================================================================
--- activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaBulkLoadingTest.java (original)
+++ activemq/sandbox/kahadb/src/test/java/org/apache/kahadb/store/perf/KahaBulkLoadingTest.java Thu Nov 20 12:12:12 2008
@@ -20,12 +20,10 @@
 import java.io.IOException;
 import java.net.URISyntaxException;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.BytesMessage;
 import javax.jms.ConnectionFactory;
 import javax.jms.DeliveryMode;
-import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Session;
@@ -35,7 +33,6 @@
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.JmsTestSupport;
 import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.ProgressPrinter;
 import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;