You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2007/12/12 23:54:00 UTC

svn commit: r603762 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/kaha/ main/java/org/apache/activemq/kaha/impl/ main/java/org/apache/activemq/store/amq/ main/java/org/apache/activemq/store/kahadaptor/ main/java/org/apache/activ...

Author: rajdavies
Date: Wed Dec 12 14:53:59 2007
New Revision: 603762

URL: http://svn.apache.org/viewvc?rev=603762&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-1507

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestUsingSharedFileTest.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java?rev=603762&r1=603761&r2=603762&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/Store.java Wed Dec 12 14:53:59 2007
@@ -272,8 +272,22 @@
      */
     long size();
     
+    /**
+     * @return true if persistent indexes are used by default
+     */
     public boolean isPersistentIndex();
     
+	/**
+	 * Set a persistent index as the default if the parameter is true
+	 * @param persistentIndex
+	 */
 	public void setPersistentIndex(boolean persistentIndex);
+	
+	/**
+	 * An explict call to initialize - this will also be called
+	 * implicitly for any other operation on the store.
+	 * @throws IOException
+	 */
+	public void initialize() throws IOException;
 	
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java?rev=603762&r1=603761&r2=603762&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/kaha/impl/KahaStore.java Wed Dec 12 14:53:59 2007
@@ -445,10 +445,10 @@
         }
         if (!initialized) {
 
-            LOG.info("Kaha Store using data directory " + directory);
+            
             lockFile = new RandomAccessFile(new File(directory, "lock"), "rw");
             lock();
-
+            LOG.info("Kaha Store using data directory " + directory);
             DataManager defaultDM = getDataManager(DEFAULT_CONTAINER_NAME);
             rootIndexManager = getIndexManager(defaultDM, DEFAULT_CONTAINER_NAME);
             IndexItem mapRoot = new IndexItem();
@@ -486,6 +486,7 @@
                 if (!BROKEN_FILE_LOCK) {
                     lock = lockFile.getChannel().tryLock();
                     if (lock == null) {
+                        initialized=false;
                         throw new StoreLockedExcpetion("Kaha Store " + directory.getName()
                                                        + "  is already opened by another application");
                     } else {
@@ -493,6 +494,7 @@
                     }
                 }
             } else { // already locked
+                initialized=false;
                 throw new StoreLockedExcpetion("Kaha Store " + directory.getName()
                                                + " is already opened by this application.");
             }
@@ -501,7 +503,7 @@
 
     private synchronized void unlock() throws IOException {
         if (!DISABLE_LOCKING && (null != directory) && (null != lock)) {
-            System.getProperties().remove(getPropertyKey());
+            System.clearProperty(getPropertyKey());
             if (lock.isValid()) {
                 lock.release();
             }
@@ -510,7 +512,6 @@
     }
 
     private String getPropertyKey() throws IOException {
-        // Is replaceAll() needed? Should test without it.
         return getClass().getName() + ".lock." + directory.getCanonicalPath();
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java?rev=603762&r1=603761&r2=603762&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/AMQPersistenceAdapter.java Wed Dec 12 14:53:59 2007
@@ -18,6 +18,8 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileLock;
 import java.util.Date;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -83,6 +85,10 @@
     private static final Log LOG = LogFactory.getLog(AMQPersistenceAdapter.class);
     private final ConcurrentHashMap<ActiveMQQueue, AMQMessageStore> queues = new ConcurrentHashMap<ActiveMQQueue, AMQMessageStore>();
     private final ConcurrentHashMap<ActiveMQTopic, AMQMessageStore> topics = new ConcurrentHashMap<ActiveMQTopic, AMQMessageStore>();
+    private static final String PROPERTY_PREFIX = "org.apache.activemq.store.amq";
+    private static final boolean BROKEN_FILE_LOCK;
+    private static final boolean DISABLE_LOCKING;
+
     private AsyncDataManager asyncDataManager;
     private ReferenceStoreAdapter referenceStoreAdapter;
     private TaskRunnerFactory taskRunnerFactory;
@@ -112,7 +118,10 @@
     private int indexKeySize = HashIndex.DEFAULT_KEY_SIZE;
     private int indexPageSize = HashIndex.DEFAULT_PAGE_SIZE;
     private Map<AMQMessageStore,Set<Integer>> dataFilesInProgress = new ConcurrentHashMap<AMQMessageStore,Set<Integer>> ();
-
+    private String directoryPath = "";
+    private RandomAccessFile lockFile;
+    private FileLock lock;
+    private boolean disableLocking = DISABLE_LOCKING;
 
     public String getBrokerName() {
         return this.brokerName;
@@ -141,13 +150,17 @@
             if (brokerService != null) {
                 this.directory = brokerService.getBrokerDataDirectory();
             } else {
+                
                 this.directory = new File(IOHelper.getDefaultDataDirectory(), IOHelper.toFileSystemSafeName(brokerName));
                 this.directory = new File(directory, "amqstore");
+                this.directoryPath=directory.getAbsolutePath();
             }
         }
         if (this.directoryArchive == null) {
             this.directoryArchive = new File(this.directory,"archive");
         }
+        lockFile = new RandomAccessFile(new File(directory, "lock"), "rw");
+        lock();
         LOG.info("AMQStore starting using directory: " + directory);
         this.directory.mkdirs();
         if (archiveDataLogs) {
@@ -240,6 +253,11 @@
         if (!started.compareAndSet(true, false)) {
             return;
         }
+        if (lockFile != null) {
+            lockFile.close();
+            lockFile = null;
+        }
+        unlock();
         this.usageManager.getMemoryUsage().removeUsageListener(this);
         synchronized (this) {
             Scheduler.cancel(periodicCheckpointTask);
@@ -818,7 +836,15 @@
 
     public void setArchiveDataLogs(boolean archiveDataLogs) {
         this.archiveDataLogs = archiveDataLogs;
-    }    
+    }  
+    
+    public boolean isDisableLocking() {
+        return disableLocking;
+    }
+
+    public void setDisableLocking(boolean disableLocking) {
+        this.disableLocking = disableLocking;
+    }
 
 	
 	protected void addInProgressDataFile(AMQMessageStore store,int dataFileId) {
@@ -836,4 +862,72 @@
             set.remove(dataFileId);
         }
     }
+	
+	
+	
+	protected void lock() throws IOException, InterruptedException {
+        boolean logged = false;
+        boolean aquiredLock = false;
+        do {
+            if (doLock()) {
+                aquiredLock = true;
+            } else {
+                if (!logged) {
+                    LOG.warn("Waiting to Lock the Store " + getDirectory());
+                    logged = true;
+                }
+                Thread.sleep(1000);
+            }
+
+            if (aquiredLock && logged) {
+                LOG.info("Aquired lock for AMQ Store" + getDirectory());
+            }
+
+        } while (!aquiredLock && !disableLocking);
+    }
+	
+	private synchronized void unlock() throws IOException {
+        if (!disableLocking && (null != directory) && (null != lock)) {
+            System.clearProperty(getPropertyKey());
+            if (lock.isValid()) {
+                lock.release();
+            }
+            lock = null;
+        }
+    }
+
+	
+	protected boolean doLock() throws IOException {
+	    boolean result = true;
+	    if (!disableLocking && directory != null && lock == null) {
+            String key = getPropertyKey();
+            String property = System.getProperty(key);
+            if (null == property) {
+                if (!BROKEN_FILE_LOCK) {
+                    lock = lockFile.getChannel().tryLock();
+                    if (lock == null) {
+                        result = false;
+                    } else {
+                        System.setProperty(key, new Date().toString());
+                    }
+                }
+            } else { // already locked
+                result = false;
+            }
+        }
+	    return result;
+	}
+	
+	private String getPropertyKey() throws IOException {
+        return getClass().getName() + ".lock." + directory.getCanonicalPath();
+    }
+	
+	static {
+	    BROKEN_FILE_LOCK = "true".equals(System.getProperty(PROPERTY_PREFIX
+	            + ".FileLockBroken",
+	            "false"));
+	    DISABLE_LOCKING = "true".equals(System.getProperty(PROPERTY_PREFIX
+	           + ".DisableLocking",
+	           "false"));
+	}
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java?rev=603762&r1=603761&r2=603762&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/amq/RecoveryListenerAdapter.java Wed Dec 12 14:53:59 2007
@@ -56,6 +56,7 @@
             return recoverMessage(message);
         } else {
             LOG.error("Message id " + ref + " could not be recovered from the data store!");
+            Thread.dumpStack();
         }
         return false;
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java?rev=603762&r1=603761&r2=603762&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadaptor/KahaReferenceStoreAdapter.java Wed Dec 12 14:53:59 2007
@@ -39,6 +39,7 @@
 import org.apache.activemq.kaha.MessageIdMarshaller;
 import org.apache.activemq.kaha.Store;
 import org.apache.activemq.kaha.StoreFactory;
+import org.apache.activemq.kaha.impl.StoreLockedExcpetion;
 import org.apache.activemq.kaha.impl.index.hash.HashIndex;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.store.ReferenceStore;
@@ -53,7 +54,7 @@
 
     
 
-    private static final Log LOG = LogFactory.getLog(KahaPersistenceAdapter.class);
+    private static final Log LOG = LogFactory.getLog(KahaReferenceStoreAdapter.class);
     private static final String STORE_STATE = "store-state";
     private static final String INDEX_VERSION_NAME = "INDEX_VERSION";
     private static final Integer INDEX_VERSION = new Integer(3);
@@ -87,7 +88,7 @@
     @Override
     public synchronized void start() throws Exception {
         super.start();
-        Store store = getStateStore();
+        Store store = getStateStore();        
         boolean empty = store.getMapContainerIds().isEmpty();
         stateMap = store.getMapContainer("state", STORE_STATE);
         stateMap.load();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java?rev=603762&r1=603761&r2=603762&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/util/BitArrayBin.java Wed Dec 12 14:53:59 2007
@@ -43,7 +43,7 @@
         maxNumberOfArrays = Math.max(maxNumberOfArrays, 1);
         list = new LinkedList<BitArray>();
         for (int i = 0; i < maxNumberOfArrays; i++) {
-            list.add(new BitArray());
+            list.add(null);
         }
     }
 
@@ -130,6 +130,10 @@
                 bin = list.size() - 1;
             }
             answer = list.get(bin);
+            if (answer == null) {
+                answer = new BitArray();
+                list.set(bin, answer);
+            }
         }
         return answer;
     }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java?rev=603762&r1=603761&r2=603762&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTest.java Wed Dec 12 14:53:59 2007
@@ -38,20 +38,15 @@
     protected String uriString = "failover://(tcp://localhost:62001,tcp://localhost:62002)?randomize=false";
 
     protected void setUp() throws Exception {
+        messageCount = 10000;
         if (System.getProperty("basedir") == null) {
             File file = new File(".");
             System.setProperty("basedir", file.getAbsolutePath());
         }
         failureCount = super.messageCount / 2;
         super.topic = isTopic();
-        BrokerFactoryBean brokerFactory = new BrokerFactoryBean(new ClassPathResource(getMasterXml()));
-        brokerFactory.afterPropertiesSet();
-        master = brokerFactory.getBroker();
-        brokerFactory = new BrokerFactoryBean(new ClassPathResource(getSlaveXml()));
-        brokerFactory.afterPropertiesSet();
-        slave = brokerFactory.getBroker();
-        master.start();
-        slave.start();
+        createMaster();
+        createSlave();
         // wait for thing to connect
         Thread.sleep(1000);
         super.setUp();
@@ -87,5 +82,19 @@
 
     protected boolean isTopic() {
         return false;
+    }
+    
+    protected void createMaster() throws Exception {
+        BrokerFactoryBean brokerFactory = new BrokerFactoryBean(new ClassPathResource(getMasterXml()));
+        brokerFactory.afterPropertiesSet();
+        master = brokerFactory.getBroker();
+        master.start();
+    }
+    
+    protected void createSlave() throws Exception {
+        BrokerFactoryBean brokerFactory = new BrokerFactoryBean(new ClassPathResource(getSlaveXml()));
+        brokerFactory.afterPropertiesSet();
+        slave = brokerFactory.getBroker();
+        slave.start();
     }
 }

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestUsingSharedFileTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestUsingSharedFileTest.java?rev=603762&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestUsingSharedFileTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestUsingSharedFileTest.java Wed Dec 12 14:53:59 2007
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.broker.ft;
+
+import org.apache.activemq.xbean.BrokerFactoryBean;
+import org.springframework.core.io.ClassPathResource;
+
+
+public class QueueMasterSlaveTestUsingSharedFileTest extends
+        QueueMasterSlaveTest {
+    
+    protected String getSlaveXml() {
+        return "org/apache/activemq/broker/ft/sharedFileSlave.xml";
+    }
+    
+    protected String getMasterXml() {
+        return "org/apache/activemq/broker/ft/sharedFileMaster.xml";
+    }
+    
+    protected void createSlave() throws Exception {
+        new Thread(new Runnable() {
+
+            public void run() {
+                try {
+                    QueueMasterSlaveTestUsingSharedFileTest.super.createSlave();
+                } catch (Exception e) {
+
+                    e.printStackTrace();
+                }
+
+            }
+
+        }).start();
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/ft/QueueMasterSlaveTestUsingSharedFileTest.java
------------------------------------------------------------------------------
    svn:eol-style = native