You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/09/16 19:09:22 UTC

svn commit: r997849 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/store/kahadb/plist/PList.java main/java/org/apache/activemq/store/kahadb/plist/PListStore.java test/java/org/apache/activemq/bugs/AMQ2910Test.java

Author: gtully
Date: Thu Sep 16 17:09:22 2010
New Revision: 997849

URL: http://svn.apache.org/viewvc?rev=997849&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2910 - fix and test, index on temp plist store  was not being protected

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2910Test.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java?rev=997849&r1=997848&r2=997849&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java Thu Sep 16 17:09:22 2010
@@ -34,10 +34,11 @@ public class PList {
     private long lastId = EntryLocation.NOT_SET;
     private final AtomicBoolean loaded = new AtomicBoolean();
     private int size = 0;
+    Object indexLock;
 
     PList(PListStore store) {
-
         this.store = store;
+        this.indexLock = store.getIndexLock();
     }
 
     public void setName(String name) {
@@ -108,11 +109,13 @@ public class PList {
     }
 
     public synchronized void destroy() throws IOException {
-        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
-            public void execute(Transaction tx) throws IOException {
-                destroy(tx);
-            }
-        });
+        synchronized (indexLock) {
+            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+                public void execute(Transaction tx) throws IOException {
+                    destroy(tx);
+                }
+            });
+        }
     }
 
     void destroy(Transaction tx) throws IOException {
@@ -158,15 +161,17 @@ public class PList {
     }
 
     synchronized public void addLast(final String id, final ByteSequence bs) throws IOException {
-        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
-            public void execute(Transaction tx) throws IOException {
-                addLast(tx, id, bs);
-            }
-        });
+        final Location location = this.store.write(bs, false);
+        synchronized (indexLock) {
+            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+                public void execute(Transaction tx) throws IOException {
+                    addLast(tx, id, bs, location);
+                }
+            });
+        }
     }
 
-    void addLast(Transaction tx, String id, ByteSequence bs) throws IOException {
-        Location location = this.store.write(bs, false);
+    private void addLast(Transaction tx, String id, ByteSequence bs, Location location) throws IOException {
         EntryLocation entry = createEntry(tx, id, this.lastId, EntryLocation.NOT_SET);
         entry.setLocation(location);
         storeEntry(tx, entry);
@@ -180,15 +185,17 @@ public class PList {
     }
 
     synchronized public void addFirst(final String id, final ByteSequence bs) throws IOException {
-        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
-            public void execute(Transaction tx) throws IOException {
-                addFirst(tx, id, bs);
-            }
-        });
+        final Location location = this.store.write(bs, false);
+        synchronized (indexLock) {
+            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+                public void execute(Transaction tx) throws IOException {
+                    addFirst(tx, id, bs, location);
+                }
+            });
+        }
     }
 
-    void addFirst(Transaction tx, String id, ByteSequence bs) throws IOException {
-        Location location = this.store.write(bs, false);
+    private void addFirst(Transaction tx, String id, ByteSequence bs, Location location) throws IOException {
         EntryLocation entry = createEntry(tx, id, EntryLocation.NOT_SET, EntryLocation.NOT_SET);
         entry.setLocation(location);
         EntryLocation oldFirst = getFirst(tx);
@@ -209,42 +216,50 @@ public class PList {
 
     synchronized public boolean remove(final String id) throws IOException {
         final AtomicBoolean result = new AtomicBoolean();
-        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
-            public void execute(Transaction tx) throws IOException {
-                result.set(remove(tx, id));
-            }
-        });
+        synchronized (indexLock) {
+            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+                public void execute(Transaction tx) throws IOException {
+                    result.set(remove(tx, id));
+                }
+            });
+        }
         return result.get();
     }
 
     synchronized public boolean remove(final int position) throws IOException {
         final AtomicBoolean result = new AtomicBoolean();
-        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
-            public void execute(Transaction tx) throws IOException {
-                result.set(remove(tx, position));
-            }
-        });
+        synchronized (indexLock) {
+            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+                public void execute(Transaction tx) throws IOException {
+                    result.set(remove(tx, position));
+                }
+            });
+        }
         return result.get();
     }
 
     synchronized public boolean remove(final PListEntry entry) throws IOException {
         final AtomicBoolean result = new AtomicBoolean();
-        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
-            public void execute(Transaction tx) throws IOException {
-                result.set(doRemove(tx, entry.getEntry()));
-            }
-        });
+        synchronized (indexLock) {
+            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+                public void execute(Transaction tx) throws IOException {
+                    result.set(doRemove(tx, entry.getEntry()));
+                }
+            });
+        }
         return result.get();
     }
 
     synchronized public PListEntry get(final int position) throws IOException {
         PListEntry result = null;
         final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
-        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
-            public void execute(Transaction tx) throws IOException {
-                ref.set(get(tx, position));
-            }
-        });
+        synchronized (indexLock) {
+            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+                public void execute(Transaction tx) throws IOException {
+                    ref.set(get(tx, position));
+                }
+            });
+        }
         if (ref.get() != null) {
             ByteSequence bs = this.store.getPayload(ref.get().getLocation());
             result = new PListEntry(ref.get(), bs);
@@ -255,14 +270,16 @@ public class PList {
     synchronized public PListEntry getFirst() throws IOException {
         PListEntry result = null;
         final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
-        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
-            public void execute(Transaction tx) throws IOException {
-                ref.set(getFirst(tx));
+        synchronized (indexLock) {
+            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+                public void execute(Transaction tx) throws IOException {
+                    ref.set(getFirst(tx));
+                }
+            });
+            if (ref.get() != null) {
+                ByteSequence bs = this.store.getPayload(ref.get().getLocation());
+                result = new PListEntry(ref.get(), bs);
             }
-        });
-        if (ref.get() != null) {
-            ByteSequence bs = this.store.getPayload(ref.get().getLocation());
-            result = new PListEntry(ref.get(), bs);
         }
         return result;
     }
@@ -270,14 +287,16 @@ public class PList {
     synchronized public PListEntry getLast() throws IOException {
         PListEntry result = null;
         final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
-        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
-            public void execute(Transaction tx) throws IOException {
-                ref.set(getLast(tx));
+        synchronized (indexLock) {
+            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+                public void execute(Transaction tx) throws IOException {
+                    ref.set(getLast(tx));
+                }
+            });
+            if (ref.get() != null) {
+                ByteSequence bs = this.store.getPayload(ref.get().getLocation());
+                result = new PListEntry(ref.get(), bs);
             }
-        });
-        if (ref.get() != null) {
-            ByteSequence bs = this.store.getPayload(ref.get().getLocation());
-            result = new PListEntry(ref.get(), bs);
         }
         return result;
     }
@@ -287,14 +306,16 @@ public class PList {
         final long nextId = entry != null ? entry.getEntry().getNext() : this.rootId;
         if (nextId != EntryLocation.NOT_SET) {
             final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
-            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
-                public void execute(Transaction tx) throws IOException {
-                    ref.set(getNext(tx, nextId));
+            synchronized (indexLock) {
+                this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+                    public void execute(Transaction tx) throws IOException {
+                        ref.set(getNext(tx, nextId));
+                    }
+                });
+                if (ref.get() != null) {
+                    ByteSequence bs = this.store.getPayload(ref.get().getLocation());
+                    result = new PListEntry(ref.get(), bs);
                 }
-            });
-            if (ref.get() != null) {
-                ByteSequence bs = this.store.getPayload(ref.get().getLocation());
-                result = new PListEntry(ref.get(), bs);
             }
         }
         return result;
@@ -303,14 +324,15 @@ public class PList {
     synchronized public PListEntry refresh(final PListEntry entry) throws IOException {
         PListEntry result = null;
         final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
-        this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
-            public void execute(Transaction tx) throws IOException {
-                ref.set(loadEntry(tx, entry.getEntry().getPage().getPageId()));
+        synchronized (indexLock) {
+            this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+                public void execute(Transaction tx) throws IOException {
+                    ref.set(loadEntry(tx, entry.getEntry().getPage().getPageId()));
+                }
+            });
+            if (ref.get() != null) {
+                result = new PListEntry(ref.get(), entry.getByteSequence());
             }
-        });
-        if (ref.get() != null) {
-
-            result = new PListEntry(ref.get(), entry.getByteSequence());
         }
         return result;
     }
@@ -390,7 +412,7 @@ public class PList {
         return null;
     }
 
-    boolean doRemove(Transaction tx, EntryLocation entry) throws IOException {
+    private boolean doRemove(Transaction tx, EntryLocation entry) throws IOException {
         boolean result = false;
         if (entry != null) {
 
@@ -450,6 +472,7 @@ public class PList {
         }
         return entry;
     }
+    
     private void storeEntry(Transaction tx, EntryLocation entry) throws IOException {
         tx.store(entry.getPage(), EntryLocationMarshaller.INSTANCE, true);
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java?rev=997849&r1=997848&r2=997849&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java Thu Sep 16 17:09:22 2010
@@ -45,6 +45,9 @@ import org.apache.kahadb.util.LockFile;
 import org.apache.kahadb.util.StringMarshaller;
 import org.apache.kahadb.util.VariableMarshaller;
 
+/**
+ * @org.apache.xbean.XBean
+ */
 public class PListStore extends ServiceSupport {
     static final Log LOG = LogFactory.getLog(PListStore.class);
     private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
@@ -65,6 +68,11 @@ public class PListStore extends ServiceS
     MetaData metaData = new MetaData(this);
     final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
     Map<String, PList> persistentLists = new HashMap<String, PList>();
+    final Object indexLock = new Object();
+
+    public Object getIndexLock() {
+        return indexLock;
+    }
 
     protected class MetaData {
         protected MetaData(PListStore store) {
@@ -188,7 +196,7 @@ public class PListStore extends ServiceS
         }
     }
 
-    public PList getPList(final String name) throws Exception {
+    synchronized public PList getPList(final String name) throws Exception {
         if (!isStarted()) {
             throw new IllegalStateException("Not started");
         }

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2910Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2910Test.java?rev=997849&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2910Test.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2910Test.java Thu Sep 16 17:09:22 2010
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.util.Vector;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import javax.jms.ConnectionFactory;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.JmsMultipleClientsTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+
+public class AMQ2910Test extends JmsMultipleClientsTestSupport {
+
+    final int maxConcurrency = 60;
+    final int msgCount = 200;
+    final Vector<Throwable> exceptions = new Vector<Throwable>();
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        //persistent = true;
+        BrokerService broker = new BrokerService();
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.addConnector("tcp://localhost:0");
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry defaultEntry = new PolicyEntry();
+        defaultEntry.setPendingQueuePolicy(new FilePendingQueueMessageStoragePolicy());
+        defaultEntry.setCursorMemoryHighWaterMark(50);
+        defaultEntry.setMemoryLimit(500*1024);
+        defaultEntry.setProducerFlowControl(false);
+        policyMap.setDefaultEntry(defaultEntry);
+        broker.setDestinationPolicy(policyMap);
+
+        broker.getSystemUsage().getMemoryUsage().setLimit(1000 * 1024);
+
+        return broker;
+    }
+
+    public void testConcurrentSendToPendingCursor() throws Exception {
+        final ActiveMQConnectionFactory factory =
+                new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri());
+        factory.setCloseTimeout(30000);
+        ExecutorService executor = Executors.newCachedThreadPool();
+        for (int i=0; i<maxConcurrency; i++) {
+            final ActiveMQQueue dest = new ActiveMQQueue("Queue-" + i);
+            executor.execute(new Runnable() {
+                public void run() {
+                    try {
+                        sendMessages(factory.createConnection(), dest, msgCount);
+                    } catch (Throwable t) {
+                        exceptions.add(t);
+                    }
+                }
+            });
+        }
+
+        executor.shutdown();
+
+
+        assertTrue("send completed", executor.awaitTermination(60, TimeUnit.SECONDS));
+        assertNoExceptions();
+
+
+        executor = Executors.newCachedThreadPool();
+        for (int i=0; i<maxConcurrency; i++) {
+            final ActiveMQQueue dest = new ActiveMQQueue("Queue-" + i);
+            executor.execute(new Runnable() {
+                public void run() {
+                    try {
+                        startConsumers(factory, dest);
+                    } catch (Throwable t) {
+                        exceptions.add(t);
+                    }
+                }
+            });
+        }
+
+        executor.shutdown();
+        assertTrue("consumers completed", executor.awaitTermination(60, TimeUnit.SECONDS));
+
+        allMessagesList.setMaximumDuration(120*1000);
+        final int numExpected = maxConcurrency * msgCount;
+        allMessagesList.waitForMessagesToArrive(numExpected);
+
+        if (allMessagesList.getMessageCount() != numExpected) {
+            dumpAllThreads(getName());
+        }
+        allMessagesList.assertMessagesReceivedNoWait(numExpected);
+
+        assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
+
+    }
+
+    private void assertNoExceptions() {
+        if (!exceptions.isEmpty()) {
+            for (Throwable t: exceptions) {
+                t.printStackTrace();
+            }
+        }
+        assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2910Test.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2910Test.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date