You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/09/16 19:09:22 UTC
svn commit: r997849 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/store/kahadb/plist/PList.java
main/java/org/apache/activemq/store/kahadb/plist/PListStore.java
test/java/org/apache/activemq/bugs/AMQ2910Test.java
Author: gtully
Date: Thu Sep 16 17:09:22 2010
New Revision: 997849
URL: http://svn.apache.org/viewvc?rev=997849&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2910 - fix and test, index on temp plist store was not being protected
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2910Test.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java?rev=997849&r1=997848&r2=997849&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PList.java Thu Sep 16 17:09:22 2010
@@ -34,10 +34,11 @@ public class PList {
private long lastId = EntryLocation.NOT_SET;
private final AtomicBoolean loaded = new AtomicBoolean();
private int size = 0;
+ Object indexLock;
PList(PListStore store) {
-
this.store = store;
+ this.indexLock = store.getIndexLock();
}
public void setName(String name) {
@@ -108,11 +109,13 @@ public class PList {
}
public synchronized void destroy() throws IOException {
- this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
- public void execute(Transaction tx) throws IOException {
- destroy(tx);
- }
- });
+ synchronized (indexLock) {
+ this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+ public void execute(Transaction tx) throws IOException {
+ destroy(tx);
+ }
+ });
+ }
}
void destroy(Transaction tx) throws IOException {
@@ -158,15 +161,17 @@ public class PList {
}
synchronized public void addLast(final String id, final ByteSequence bs) throws IOException {
- this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
- public void execute(Transaction tx) throws IOException {
- addLast(tx, id, bs);
- }
- });
+ final Location location = this.store.write(bs, false);
+ synchronized (indexLock) {
+ this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+ public void execute(Transaction tx) throws IOException {
+ addLast(tx, id, bs, location);
+ }
+ });
+ }
}
- void addLast(Transaction tx, String id, ByteSequence bs) throws IOException {
- Location location = this.store.write(bs, false);
+ private void addLast(Transaction tx, String id, ByteSequence bs, Location location) throws IOException {
EntryLocation entry = createEntry(tx, id, this.lastId, EntryLocation.NOT_SET);
entry.setLocation(location);
storeEntry(tx, entry);
@@ -180,15 +185,17 @@ public class PList {
}
synchronized public void addFirst(final String id, final ByteSequence bs) throws IOException {
- this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
- public void execute(Transaction tx) throws IOException {
- addFirst(tx, id, bs);
- }
- });
+ final Location location = this.store.write(bs, false);
+ synchronized (indexLock) {
+ this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+ public void execute(Transaction tx) throws IOException {
+ addFirst(tx, id, bs, location);
+ }
+ });
+ }
}
- void addFirst(Transaction tx, String id, ByteSequence bs) throws IOException {
- Location location = this.store.write(bs, false);
+ private void addFirst(Transaction tx, String id, ByteSequence bs, Location location) throws IOException {
EntryLocation entry = createEntry(tx, id, EntryLocation.NOT_SET, EntryLocation.NOT_SET);
entry.setLocation(location);
EntryLocation oldFirst = getFirst(tx);
@@ -209,42 +216,50 @@ public class PList {
synchronized public boolean remove(final String id) throws IOException {
final AtomicBoolean result = new AtomicBoolean();
- this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
- public void execute(Transaction tx) throws IOException {
- result.set(remove(tx, id));
- }
- });
+ synchronized (indexLock) {
+ this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+ public void execute(Transaction tx) throws IOException {
+ result.set(remove(tx, id));
+ }
+ });
+ }
return result.get();
}
synchronized public boolean remove(final int position) throws IOException {
final AtomicBoolean result = new AtomicBoolean();
- this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
- public void execute(Transaction tx) throws IOException {
- result.set(remove(tx, position));
- }
- });
+ synchronized (indexLock) {
+ this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+ public void execute(Transaction tx) throws IOException {
+ result.set(remove(tx, position));
+ }
+ });
+ }
return result.get();
}
synchronized public boolean remove(final PListEntry entry) throws IOException {
final AtomicBoolean result = new AtomicBoolean();
- this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
- public void execute(Transaction tx) throws IOException {
- result.set(doRemove(tx, entry.getEntry()));
- }
- });
+ synchronized (indexLock) {
+ this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+ public void execute(Transaction tx) throws IOException {
+ result.set(doRemove(tx, entry.getEntry()));
+ }
+ });
+ }
return result.get();
}
synchronized public PListEntry get(final int position) throws IOException {
PListEntry result = null;
final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
- this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
- public void execute(Transaction tx) throws IOException {
- ref.set(get(tx, position));
- }
- });
+ synchronized (indexLock) {
+ this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+ public void execute(Transaction tx) throws IOException {
+ ref.set(get(tx, position));
+ }
+ });
+ }
if (ref.get() != null) {
ByteSequence bs = this.store.getPayload(ref.get().getLocation());
result = new PListEntry(ref.get(), bs);
@@ -255,14 +270,16 @@ public class PList {
synchronized public PListEntry getFirst() throws IOException {
PListEntry result = null;
final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
- this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
- public void execute(Transaction tx) throws IOException {
- ref.set(getFirst(tx));
+ synchronized (indexLock) {
+ this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+ public void execute(Transaction tx) throws IOException {
+ ref.set(getFirst(tx));
+ }
+ });
+ if (ref.get() != null) {
+ ByteSequence bs = this.store.getPayload(ref.get().getLocation());
+ result = new PListEntry(ref.get(), bs);
}
- });
- if (ref.get() != null) {
- ByteSequence bs = this.store.getPayload(ref.get().getLocation());
- result = new PListEntry(ref.get(), bs);
}
return result;
}
@@ -270,14 +287,16 @@ public class PList {
synchronized public PListEntry getLast() throws IOException {
PListEntry result = null;
final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
- this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
- public void execute(Transaction tx) throws IOException {
- ref.set(getLast(tx));
+ synchronized (indexLock) {
+ this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+ public void execute(Transaction tx) throws IOException {
+ ref.set(getLast(tx));
+ }
+ });
+ if (ref.get() != null) {
+ ByteSequence bs = this.store.getPayload(ref.get().getLocation());
+ result = new PListEntry(ref.get(), bs);
}
- });
- if (ref.get() != null) {
- ByteSequence bs = this.store.getPayload(ref.get().getLocation());
- result = new PListEntry(ref.get(), bs);
}
return result;
}
@@ -287,14 +306,16 @@ public class PList {
final long nextId = entry != null ? entry.getEntry().getNext() : this.rootId;
if (nextId != EntryLocation.NOT_SET) {
final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
- this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
- public void execute(Transaction tx) throws IOException {
- ref.set(getNext(tx, nextId));
+ synchronized (indexLock) {
+ this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+ public void execute(Transaction tx) throws IOException {
+ ref.set(getNext(tx, nextId));
+ }
+ });
+ if (ref.get() != null) {
+ ByteSequence bs = this.store.getPayload(ref.get().getLocation());
+ result = new PListEntry(ref.get(), bs);
}
- });
- if (ref.get() != null) {
- ByteSequence bs = this.store.getPayload(ref.get().getLocation());
- result = new PListEntry(ref.get(), bs);
}
}
return result;
@@ -303,14 +324,15 @@ public class PList {
synchronized public PListEntry refresh(final PListEntry entry) throws IOException {
PListEntry result = null;
final AtomicReference<EntryLocation> ref = new AtomicReference<EntryLocation>();
- this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
- public void execute(Transaction tx) throws IOException {
- ref.set(loadEntry(tx, entry.getEntry().getPage().getPageId()));
+ synchronized (indexLock) {
+ this.store.getPageFile().tx().execute(new Transaction.Closure<IOException>() {
+ public void execute(Transaction tx) throws IOException {
+ ref.set(loadEntry(tx, entry.getEntry().getPage().getPageId()));
+ }
+ });
+ if (ref.get() != null) {
+ result = new PListEntry(ref.get(), entry.getByteSequence());
}
- });
- if (ref.get() != null) {
-
- result = new PListEntry(ref.get(), entry.getByteSequence());
}
return result;
}
@@ -390,7 +412,7 @@ public class PList {
return null;
}
- boolean doRemove(Transaction tx, EntryLocation entry) throws IOException {
+ private boolean doRemove(Transaction tx, EntryLocation entry) throws IOException {
boolean result = false;
if (entry != null) {
@@ -450,6 +472,7 @@ public class PList {
}
return entry;
}
+
private void storeEntry(Transaction tx, EntryLocation entry) throws IOException {
tx.store(entry.getPage(), EntryLocationMarshaller.INSTANCE, true);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java?rev=997849&r1=997848&r2=997849&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/store/kahadb/plist/PListStore.java Thu Sep 16 17:09:22 2010
@@ -45,6 +45,9 @@ import org.apache.kahadb.util.LockFile;
import org.apache.kahadb.util.StringMarshaller;
import org.apache.kahadb.util.VariableMarshaller;
+/**
+ * @org.apache.xbean.XBean
+ */
public class PListStore extends ServiceSupport {
static final Log LOG = LogFactory.getLog(PListStore.class);
private static final int DATABASE_LOCKED_WAIT_DELAY = 10 * 1000;
@@ -65,6 +68,11 @@ public class PListStore extends ServiceS
MetaData metaData = new MetaData(this);
final MetaDataMarshaller metaDataMarshaller = new MetaDataMarshaller(this);
Map<String, PList> persistentLists = new HashMap<String, PList>();
+ final Object indexLock = new Object();
+
+ public Object getIndexLock() {
+ return indexLock;
+ }
protected class MetaData {
protected MetaData(PListStore store) {
@@ -188,7 +196,7 @@ public class PListStore extends ServiceS
}
}
- public PList getPList(final String name) throws Exception {
+ synchronized public PList getPList(final String name) throws Exception {
if (!isStarted()) {
throw new IllegalStateException("Not started");
}
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2910Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2910Test.java?rev=997849&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2910Test.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2910Test.java Thu Sep 16 17:09:22 2010
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.util.Vector;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import javax.jms.ConnectionFactory;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.JmsMultipleClientsTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+
+public class AMQ2910Test extends JmsMultipleClientsTestSupport {
+
+ final int maxConcurrency = 60;
+ final int msgCount = 200;
+ final Vector<Throwable> exceptions = new Vector<Throwable>();
+
+ @Override
+ protected BrokerService createBroker() throws Exception {
+ //persistent = true;
+ BrokerService broker = new BrokerService();
+ broker.setDeleteAllMessagesOnStartup(true);
+ broker.addConnector("tcp://localhost:0");
+ PolicyMap policyMap = new PolicyMap();
+ PolicyEntry defaultEntry = new PolicyEntry();
+ defaultEntry.setPendingQueuePolicy(new FilePendingQueueMessageStoragePolicy());
+ defaultEntry.setCursorMemoryHighWaterMark(50);
+ defaultEntry.setMemoryLimit(500*1024);
+ defaultEntry.setProducerFlowControl(false);
+ policyMap.setDefaultEntry(defaultEntry);
+ broker.setDestinationPolicy(policyMap);
+
+ broker.getSystemUsage().getMemoryUsage().setLimit(1000 * 1024);
+
+ return broker;
+ }
+
+ public void testConcurrentSendToPendingCursor() throws Exception {
+ final ActiveMQConnectionFactory factory =
+ new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri());
+ factory.setCloseTimeout(30000);
+ ExecutorService executor = Executors.newCachedThreadPool();
+ for (int i=0; i<maxConcurrency; i++) {
+ final ActiveMQQueue dest = new ActiveMQQueue("Queue-" + i);
+ executor.execute(new Runnable() {
+ public void run() {
+ try {
+ sendMessages(factory.createConnection(), dest, msgCount);
+ } catch (Throwable t) {
+ exceptions.add(t);
+ }
+ }
+ });
+ }
+
+ executor.shutdown();
+
+
+ assertTrue("send completed", executor.awaitTermination(60, TimeUnit.SECONDS));
+ assertNoExceptions();
+
+
+ executor = Executors.newCachedThreadPool();
+ for (int i=0; i<maxConcurrency; i++) {
+ final ActiveMQQueue dest = new ActiveMQQueue("Queue-" + i);
+ executor.execute(new Runnable() {
+ public void run() {
+ try {
+ startConsumers(factory, dest);
+ } catch (Throwable t) {
+ exceptions.add(t);
+ }
+ }
+ });
+ }
+
+ executor.shutdown();
+ assertTrue("consumers completed", executor.awaitTermination(60, TimeUnit.SECONDS));
+
+ allMessagesList.setMaximumDuration(120*1000);
+ final int numExpected = maxConcurrency * msgCount;
+ allMessagesList.waitForMessagesToArrive(numExpected);
+
+ if (allMessagesList.getMessageCount() != numExpected) {
+ dumpAllThreads(getName());
+ }
+ allMessagesList.assertMessagesReceivedNoWait(numExpected);
+
+ assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
+
+ }
+
+ private void assertNoExceptions() {
+ if (!exceptions.isEmpty()) {
+ for (Throwable t: exceptions) {
+ t.printStackTrace();
+ }
+ }
+ assertTrue("no exceptions: " + exceptions, exceptions.isEmpty());
+ }
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2910Test.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2910Test.java
------------------------------------------------------------------------------
svn:keywords = Rev Date