You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/06/18 14:27:51 UTC

svn commit: r955973 - in /activemq/trunk: activemq-core/src/main/java/org/apache/activemq/store/kahadb/ activemq-ra/src/test/java/org/apache/activemq/ra/ kahadb/src/main/java/org/apache/kahadb/util/

Author: gtully
Date: Fri Jun 18 12:27:51 2010
New Revision: 955973

URL: http://svn.apache.org/viewvc?rev=955973&view=rev
Log:
have KahaDB lock work in vm, so master slave tests can work ok. preserver kaha behaiour in this regard as it makes testing simpler. fix npe on shutdown if start fails

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
    activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/FailoverManagedClusterTest.java
    activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/LockFile.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java?rev=955973&r1=955972&r2=955973&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java Fri Jun 18 12:27:51 2010
@@ -205,7 +205,9 @@ public class KahaDBStore extends Message
     public void doStop(ServiceStopper stopper) throws Exception {
         //drain down async jobs
         LOG.info("Stopping async queue tasks");
-        this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
+        if (this.globalQueueSemaphore != null) {
+            this.globalQueueSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
+        }
         synchronized (this.asyncQueueMap) {
             for (StoreQueueTask task : this.asyncQueueMap.values()) {
                 task.cancel();
@@ -213,7 +215,9 @@ public class KahaDBStore extends Message
             this.asyncQueueMap.clear();
         }
         LOG.info("Stopping async topic tasks");
-        this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
+        if (this.globalTopicSemaphore != null) {
+            this.globalTopicSemaphore.tryAcquire(this.maxAsyncJobs, 60, TimeUnit.SECONDS);
+        }
         synchronized (this.asyncTopicMap) {
             for (StoreTopicTask task : this.asyncTopicMap.values()) {
                 task.cancel();

Modified: activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/FailoverManagedClusterTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/FailoverManagedClusterTest.java?rev=955973&r1=955972&r2=955973&view=diff
==============================================================================
--- activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/FailoverManagedClusterTest.java (original)
+++ activemq/trunk/activemq-ra/src/test/java/org/apache/activemq/ra/FailoverManagedClusterTest.java Fri Jun 18 12:27:51 2010
@@ -49,9 +49,12 @@ import junit.framework.TestCase;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 public class FailoverManagedClusterTest extends TestCase {
-
+    private static final Log LOG = LogFactory.getLog(FailoverManagedClusterTest.class);
+    
     long txGenerator = System.currentTimeMillis();
     
     private static final String MASTER_BIND_ADDRESS = "tcp://0.0.0.0:61616";
@@ -61,12 +64,25 @@ public class FailoverManagedClusterTest 
     
     private BrokerService master;
     private BrokerService slave;
+    private CountDownLatch slaveThreadStarted = new CountDownLatch(1);
 
+    @Override
     protected void setUp() throws Exception {
         createAndStartMaster();
         createAndStartSlave();    
     }
     
+    @Override
+    protected void tearDown() throws Exception {
+        if (slave != null) {
+            slave.stop();
+        }
+        if (master != null) {
+            master.stop();
+        }
+    }
+
+
 
     private void createAndStartMaster() throws Exception {
         master = new BrokerService();
@@ -88,8 +104,9 @@ public class FailoverManagedClusterTest 
         new Thread(new Runnable() {
             public void run() {
                 try {
+                    slaveThreadStarted.countDown();
                     slave.start();
-                    System.out.println("slave has started");
+                    LOG.info("slave has started");
                 } catch (Exception e) {
                     e.printStackTrace();
                 }
@@ -112,7 +129,7 @@ public class FailoverManagedClusterTest 
 
         final StubMessageEndpoint endpoint = new StubMessageEndpoint() {
             public void onMessage(Message message) {
-                System.out.println("Received message " + message);
+                LOG.info("Received message " + message);
                 super.onMessage(message);
                 messageDelivered.countDown();
             };
@@ -144,18 +161,14 @@ public class FailoverManagedClusterTest 
         } catch (InterruptedException e) {
         }
 
-        // Send the broker a message to that endpoint
         MessageProducer producer = session.createProducer(new ActiveMQQueue("TEST"));
-
-        // force a failover
+        slaveThreadStarted.await(10, TimeUnit.SECONDS);
+        
+        // force a failover before send
+        LOG.info("Stopping master to force failover..");
         master.stop();
-        slave.waitUntilStarted();
-
-        try {
-            Thread.sleep(2000);
-        } catch (InterruptedException ie) {
-            // ignore
-        }
+        master = null;
+        assertTrue("slave started ok", slave.waitUntilStarted());
 
         producer.send(session.createTextMessage("Hello, again!"));
 

Modified: activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/LockFile.java
URL: http://svn.apache.org/viewvc/activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/LockFile.java?rev=955973&r1=955972&r2=955973&view=diff
==============================================================================
--- activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/LockFile.java (original)
+++ activemq/trunk/kahadb/src/main/java/org/apache/kahadb/util/LockFile.java Fri Jun 18 12:27:51 2010
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.channels.FileLock;
 import java.nio.channels.OverlappingFileLockException;
+import java.util.Date;
 
 /**
  * Used to lock a File.
@@ -55,7 +56,9 @@ public class LockFile {
         }
         
         IOHelper.mkdirs(file.getParentFile());
-        
+        if (System.getProperty(getVmLockKey()) != null) {
+            throw new IOException("File '" + file + "' could not be locked as lock is already held for this jvm.");
+        }
         if (lock == null) {
             readFile = new RandomAccessFile(file, "rw");
             IOException reason = null;
@@ -66,6 +69,7 @@ public class LockFile {
             }
             if (lock != null) {
                 lockCounter++;
+                System.setProperty(getVmLockKey(), new Date().toString());
             } else {
                 // new read file for next attempt
                 closeReadFile();
@@ -94,6 +98,7 @@ public class LockFile {
         if (lock != null) {
             try {
                 lock.release();
+                System.getProperties().remove(getVmLockKey());
             } catch (Throwable ignore) {
             }
             lock = null;
@@ -105,6 +110,10 @@ public class LockFile {
         }
     }
 
+    private String getVmLockKey() throws IOException {
+        return getClass().getName() + ".lock." + file.getCanonicalPath();
+    }
+
     private void closeReadFile() {
         // close the file.
         if (readFile != null) {