You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2012/03/27 13:04:05 UTC
svn commit: r1305809 [1/3] - in /qpid/trunk/qpid/java: ./ bdbstore/
bdbstore/bin/ bdbstore/src/main/java/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/
bdbstore/src...
Author: kwall
Date: Tue Mar 27 11:04:02 2012
New Revision: 1305809
URL: http://svn.apache.org/viewvc?rev=1305809&view=rev
Log:
QPID-3913: Add functionality to upgrade bdbstore automatically on broker start-up. Store message content using single chunk. Change store version to 6. Remove implementations of tuple bindings for previous versions.
Applied patch from Phil Harvey<ph...@philharveyonline.com> Oleksandr Rudyy<or...@gmail.com>
Added:
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/BindingRecord.java
- copied, changed from r1305371, qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/BindingRecord.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/ExchangeRecord.java
- copied, changed from r1305371, qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/ExchangeRecord.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/PreparedTransaction.java
- copied, changed from r1305371, qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/PreparedTransaction.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/QueueEntryKey.java
- copied, changed from r1305371, qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/QueueRecord.java
- copied, changed from r1305371, qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/Xid.java
- copied, changed from r1305371, qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/Xid.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/AMQShortStringBinding.java
- copied, changed from r1305371, qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTB.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ContentBinding.java
- copied, changed from r1305371, qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ExchangeBinding.java
- copied, changed from r1305371, qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/MessageMetaDataBinding.java
- copied, changed from r1305371, qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_5.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java
- copied, changed from r1305371, qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/PreparedTransactionTB.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueBinding.java
- copied, changed from r1305371, qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_5.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueBindingTupleBinding.java
- copied, changed from r1305371, qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_4.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueEntryBinding.java
- copied, changed from r1305371, qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/StringMapBinding.java
- copied, changed from r1305371, qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StringMapBinding.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/UUIDTupleBinding.java
- copied, changed from r1305371, qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/UUIDTupleBinding.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/XidBinding.java
- copied, changed from r1305371, qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/XidTB.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractStoreUpgrade.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/CursorOperation.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/CursorTemplate.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseCallable.java
- copied, changed from r1305371, qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseEntryCallback.java
- copied, changed from r1305371, qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageContentKey.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseRunnable.java
- copied, changed from r1305371, qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseTemplate.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/StoreUpgrade.java
- copied, changed from r1305371, qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4To5.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeInteractionHandler.java
- copied, changed from r1305371, qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeInteractionResponse.java
- copied, changed from r1305371, qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/upgrade/Upgrader.java
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/AbstractUpgradeTestCase.java
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/DatabaseTemplateTest.java
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom4to5Test.java
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgradeFrom5To6Test.java
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/upgrade/UpgraderTest.java
qpid/trunk/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v4/
qpid/trunk/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v4/test-store/
qpid/trunk/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v4/test-store/00000000.jdb
- copied, changed from r1305371, qpid/trunk/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-to-upgrade/test-store/00000000.jdb
qpid/trunk/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/
qpid/trunk/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/readme.txt
qpid/trunk/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/
qpid/trunk/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000000.jdb
- copied, changed from r1305371, qpid/trunk/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-to-upgrade/test-store/00000000.jdb
qpid/trunk/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-v5/test-store/00000001.jdb
- copied, changed from r1305371, qpid/trunk/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-to-upgrade/test-store/00000000.jdb
Removed:
qpid/trunk/qpid/java/bdbstore/bin/storeUpgrade.sh
qpid/trunk/qpid/java/bdbstore/src/main/java/BDBStoreUpgrade.log4j.xml
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTB.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/DatabaseVisitor.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageContentKey.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StringMapBinding.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/UUIDTupleBinding.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_4.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/MessageContentKey_5.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/Xid.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/BindingRecord.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/ExchangeRecord.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/PreparedTransaction.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTupleBindingFactory.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_4.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_4.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTB_5.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageContentKeyTupleBindingFactory.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_4.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_5.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/PreparedTransactionTB.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_4.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_5.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/XidTB.java
qpid/trunk/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-to-upgrade/test-store/00000000.jdb
Modified:
qpid/trunk/qpid/java/bdbstore/build.xml
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncoding.java
qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java
qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/logging/actors/CurrentActor.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/FieldTableSupport.java
qpid/trunk/qpid/java/module.xml
Modified: qpid/trunk/qpid/java/bdbstore/build.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/build.xml?rev=1305809&r1=1305808&r2=1305809&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/build.xml (original)
+++ qpid/trunk/qpid/java/bdbstore/build.xml Tue Mar 27 11:04:02 2012
@@ -81,4 +81,11 @@ http://www.oracle.com/technetwork/databa
<fileset dir="src/test/resources/upgrade"/>
</copy>
</target>
+
+ <target name="precompile-tests">
+ <mkdir dir="${module.test.resources}"/>
+ <copy todir="${module.test.resources}">
+ <fileset dir="src/test/resources"/>
+ </copy>
+ </target>
</project>
Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncoding.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncoding.java?rev=1305809&r1=1305808&r2=1305809&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncoding.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncoding.java Tue Mar 27 11:04:02 2012
@@ -33,7 +33,7 @@ public class AMQShortStringEncoding
public static AMQShortString readShortString(TupleInput tupleInput)
{
- int length = (int) tupleInput.readShort();
+ int length = tupleInput.readShort();
if (length < 0)
{
return null;
Modified: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java?rev=1305809&r1=1305808&r2=1305809&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java Tue Mar 27 11:04:02 2012
@@ -20,33 +20,29 @@
*/
package org.apache.qpid.server.store.berkeleydb;
-import com.sleepycat.bind.EntryBinding;
-import com.sleepycat.bind.tuple.ByteBinding;
-import com.sleepycat.bind.tuple.IntegerBinding;
-import com.sleepycat.bind.tuple.LongBinding;
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.je.CheckpointConfig;
-import com.sleepycat.je.Cursor;
-import com.sleepycat.je.Database;
-import com.sleepycat.je.DatabaseConfig;
-import com.sleepycat.je.DatabaseEntry;
-import com.sleepycat.je.DatabaseException;
-import com.sleepycat.je.Environment;
-import com.sleepycat.je.EnvironmentConfig;
-import com.sleepycat.je.LockConflictException;
-import com.sleepycat.je.LockMode;
-import com.sleepycat.je.OperationStatus;
-import com.sleepycat.je.Transaction;
-import com.sleepycat.je.TransactionConfig;
+import java.io.File;
+import java.lang.ref.SoftReference;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
-
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.federation.Bridge;
import org.apache.qpid.server.federation.BrokerLink;
+import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
@@ -68,33 +64,39 @@ import org.apache.qpid.server.store.Stor
import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
import org.apache.qpid.server.store.TransactionLogRecoveryHandler.QueueEntryRecoveryHandler;
import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_5;
-import org.apache.qpid.server.store.berkeleydb.keys.Xid;
-import org.apache.qpid.server.store.berkeleydb.records.BindingRecord;
-import org.apache.qpid.server.store.berkeleydb.records.ExchangeRecord;
-import org.apache.qpid.server.store.berkeleydb.records.PreparedTransaction;
-import org.apache.qpid.server.store.berkeleydb.records.QueueRecord;
-import org.apache.qpid.server.store.berkeleydb.tuples.BindingTupleBindingFactory;
-import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTB_5;
-import org.apache.qpid.server.store.berkeleydb.tuples.MessageMetaDataTupleBindingFactory;
-import org.apache.qpid.server.store.berkeleydb.tuples.PreparedTransactionTB;
-import org.apache.qpid.server.store.berkeleydb.tuples.QueueEntryTB;
-import org.apache.qpid.server.store.berkeleydb.tuples.QueueTupleBindingFactory;
-import org.apache.qpid.server.store.berkeleydb.tuples.XidTB;
+import org.apache.qpid.server.store.berkeleydb.entry.BindingRecord;
+import org.apache.qpid.server.store.berkeleydb.entry.ExchangeRecord;
+import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction;
+import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey;
+import org.apache.qpid.server.store.berkeleydb.entry.QueueRecord;
+import org.apache.qpid.server.store.berkeleydb.entry.Xid;
+import org.apache.qpid.server.store.berkeleydb.tuple.AMQShortStringBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.ContentBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.ExchangeBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.MessageMetaDataBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.PreparedTransactionBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.QueueBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.QueueBindingTupleBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.QueueEntryBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.StringMapBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.UUIDTupleBinding;
+import org.apache.qpid.server.store.berkeleydb.tuple.XidBinding;
+import org.apache.qpid.server.store.berkeleydb.upgrade.Upgrader;
-import java.io.File;
-import java.lang.ref.SoftReference;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Random;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
+import com.sleepycat.bind.tuple.ByteBinding;
+import com.sleepycat.bind.tuple.LongBinding;
+import com.sleepycat.je.CheckpointConfig;
+import com.sleepycat.je.Cursor;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.LockConflictException;
+import com.sleepycat.je.LockMode;
+import com.sleepycat.je.OperationStatus;
+import com.sleepycat.je.TransactionConfig;
/**
* BDBMessageStore implements a persistent {@link MessageStore} using the BDB high performance log.
@@ -111,21 +113,22 @@ public class BDBMessageStore implements
private static final int LOCK_RETRY_ATTEMPTS = 5;
- static final int DATABASE_FORMAT_VERSION = 5;
- private static final String DATABASE_FORMAT_VERSION_PROPERTY = "version";
+ public static final int VERSION = 6;
+
public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path";
private Environment _environment;
- private String MESSAGEMETADATADB_NAME = "messageMetaDataDb";
- private String MESSAGECONTENTDB_NAME = "messageContentDb";
- private String QUEUEBINDINGSDB_NAME = "queueBindingsDb";
- private String DELIVERYDB_NAME = "deliveryDb";
- private String EXCHANGEDB_NAME = "exchangeDb";
- private String QUEUEDB_NAME = "queueDb";
- private String BRIDGEDB_NAME = "bridges";
- private String LINKDB_NAME = "links";
- private String XIDDB_NAME = "xids";
+ private String MESSAGEMETADATADB_NAME = "MESSAGE_METADATA";
+ private String MESSAGECONTENTDB_NAME = "MESSAGE_CONTENT";
+ private String QUEUEBINDINGSDB_NAME = "QUEUE_BINDINGS";
+ private String DELIVERYDB_NAME = "DELIVERIES";
+ private String EXCHANGEDB_NAME = "EXCHANGES";
+ private String QUEUEDB_NAME = "QUEUES";
+ private String BRIDGEDB_NAME = "BRIDGES";
+ private String LINKDB_NAME = "LINKS";
+ private String XIDDB_NAME = "XIDS";
+
private Database _messageMetaDataDb;
private Database _messageContentDb;
@@ -168,13 +171,6 @@ public class BDBMessageStore implements
private final CommitThread _commitThread = new CommitThread("Commit-Thread");
- // Factory Classes to create the TupleBinding objects that reflect the version instance of this BDBStore
- private MessageMetaDataTupleBindingFactory _metaDataTupleBindingFactory;
- private QueueTupleBindingFactory _queueTupleBindingFactory;
- private BindingTupleBindingFactory _bindingTupleBindingFactory;
-
- /** The data version this store should run with */
- private int _version;
private enum State
{
INITIAL,
@@ -197,37 +193,8 @@ public class BDBMessageStore implements
public BDBMessageStore()
{
- this(DATABASE_FORMAT_VERSION);
}
- public BDBMessageStore(int version)
- {
- _version = version;
- }
-
- private void setDatabaseNames(int version)
- {
- if (version > 1)
- {
- MESSAGEMETADATADB_NAME += "_v" + version;
-
- MESSAGECONTENTDB_NAME += "_v" + version;
-
- QUEUEDB_NAME += "_v" + version;
-
- DELIVERYDB_NAME += "_v" + version;
-
- EXCHANGEDB_NAME += "_v" + version;
-
- QUEUEBINDINGSDB_NAME += "_v" + version;
-
- LINKDB_NAME += "_v" + version;
-
- BRIDGEDB_NAME += "_v" + version;
-
- XIDDB_NAME += "_v" + version;
- }
- }
public void configureConfigStore(String name,
ConfigurationRecoveryHandler recoveryHandler,
@@ -281,7 +248,7 @@ public class BDBMessageStore implements
}
recoverQueueEntries(recoveryHandler);
-
+
}
@@ -313,13 +280,16 @@ public class BDBMessageStore implements
}
}
- CurrentActor.get().message(_logSubject, MessageStoreMessages.STORE_LOCATION(environmentPath.getAbsolutePath()));
-
- _version = storeConfig.getInt(DATABASE_FORMAT_VERSION_PROPERTY, DATABASE_FORMAT_VERSION);
+ message(MessageStoreMessages.STORE_LOCATION(environmentPath.getAbsolutePath()));
return configure(environmentPath, false);
}
+ void message(final LogMessage message)
+ {
+ CurrentActor.message(_logSubject, message);
+ }
+
/**
* @param environmentPath location for the store to be created in/recovered from
* @param readonly if true then don't allow modifications to an existing store, and don't create a new store if none exists
@@ -334,20 +304,9 @@ public class BDBMessageStore implements
_log.info("Configuring BDB message store");
- createTupleBindingFactories(_version);
-
- setDatabaseNames(_version);
-
return setupStore(environmentPath, readonly);
}
- private void createTupleBindingFactories(int version)
- {
- _bindingTupleBindingFactory = new BindingTupleBindingFactory(version);
- _queueTupleBindingFactory = new QueueTupleBindingFactory(version);
- _metaDataTupleBindingFactory = new MessageMetaDataTupleBindingFactory(version);
- }
-
/**
* Move the store state from CONFIGURING to STARTED.
*
@@ -366,7 +325,7 @@ public class BDBMessageStore implements
boolean newEnvironment = createEnvironment(storePath, readonly);
- verifyVersionByTables();
+ new Upgrader(_environment, _logSubject).upgradeIfNecessary();
openDatabases(readonly);
@@ -378,40 +337,6 @@ public class BDBMessageStore implements
return newEnvironment;
}
- private void verifyVersionByTables() throws DatabaseException
- {
- for (String s : _environment.getDatabaseNames())
- {
- int versionIndex = s.indexOf("_v");
-
- // lack of _v index suggests DB is v1
- // so if _version is not v1 then error
- if (versionIndex == -1)
- {
- if (_version != 1)
- {
- closeEnvironment();
- throw new IllegalArgumentException("Error: Unable to load BDBStore as version " + _version
- + ". Store on disk contains version 1 data.");
- }
- else // DB is v1 and _version is v1
- {
- continue;
- }
- }
-
- // Otherwise Check Versions
- int version = Integer.parseInt(s.substring(versionIndex + 2));
-
- if (version != _version)
- {
- closeEnvironment();
- throw new IllegalArgumentException("Error: Unable to load BDBStore as version " + _version
- + ". Store on disk contains version " + version + " data.");
- }
- }
- }
-
private synchronized void stateTransition(State requiredState, State newState) throws AMQStoreException
{
if (_state != requiredState)
@@ -586,7 +511,7 @@ public class BDBMessageStore implements
_state = State.CLOSED;
- CurrentActor.get().message(_logSubject,MessageStoreMessages.CLOSED());
+ message(MessageStoreMessages.CLOSED());
}
private void closeEnvironment() throws DatabaseException
@@ -609,7 +534,7 @@ public class BDBMessageStore implements
{
stateTransition(State.CONFIGURED, State.RECOVERING);
- CurrentActor.get().message(_logSubject,MessageStoreMessages.RECOVERY_START());
+ message(MessageStoreMessages.RECOVERY_START());
try
{
@@ -641,10 +566,10 @@ public class BDBMessageStore implements
cursor = _queueDb.openCursor(null, null);
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry value = new DatabaseEntry();
- TupleBinding binding = _queueTupleBindingFactory.getInstance();
+ QueueBinding binding = QueueBinding.getInstance();
while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
- QueueRecord queueRecord = (QueueRecord) binding.entryToObject(value);
+ QueueRecord queueRecord = binding.entryToObject(value);
String queueName = queueRecord.getNameShortString() == null ? null :
queueRecord.getNameShortString().asString();
@@ -677,11 +602,11 @@ public class BDBMessageStore implements
cursor = _exchangeDb.openCursor(null, null);
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry value = new DatabaseEntry();
- TupleBinding binding = new ExchangeTB();
+ ExchangeBinding binding = ExchangeBinding.getInstance();
while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
- ExchangeRecord exchangeRec = (ExchangeRecord) binding.entryToObject(value);
+ ExchangeRecord exchangeRec = binding.entryToObject(value);
String exchangeName = exchangeRec.getNameShortString() == null ? null :
exchangeRec.getNameShortString().asString();
@@ -710,7 +635,7 @@ public class BDBMessageStore implements
cursor = _queueBindingsDb.openCursor(null, null);
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry value = new DatabaseEntry();
- TupleBinding<BindingRecord> binding = _bindingTupleBindingFactory.getInstance();
+ QueueBindingTupleBinding binding = QueueBindingTupleBinding.getInstance();
while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
@@ -818,14 +743,14 @@ public class BDBMessageStore implements
cursor = _messageMetaDataDb.openCursor(null, null);
DatabaseEntry key = new DatabaseEntry();
DatabaseEntry value = new DatabaseEntry();
- EntryBinding valueBinding = _metaDataTupleBindingFactory.getInstance();
+ MessageMetaDataBinding valueBinding = MessageMetaDataBinding.getInstance();
long maxId = 0;
while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
long messageId = LongBinding.entryToLong(key);
- StorableMessageMetaData metaData = (StorableMessageMetaData) valueBinding.entryToObject(value);
+ StorableMessageMetaData metaData = valueBinding.entryToObject(value);
StoredBDBMessage message = new StoredBDBMessage(messageId, metaData, false);
mrh.message(message);
@@ -861,13 +786,13 @@ public class BDBMessageStore implements
{
cursor = _deliveryDb.openCursor(null, null);
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new QueueEntryTB();
+ QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
DatabaseEntry value = new DatabaseEntry();
while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
- QueueEntryKey qek = (QueueEntryKey) keyBinding.entryToObject(key);
+ QueueEntryKey qek = keyBinding.entryToObject(key);
entries.add(qek);
}
@@ -902,8 +827,8 @@ public class BDBMessageStore implements
}
}
-
-
+
+
TransactionLogRecoveryHandler.DtxRecordRecoveryHandler dtxrh = qerh.completeQueueEntryRecovery();
cursor = null;
@@ -911,8 +836,8 @@ public class BDBMessageStore implements
{
cursor = _xidDb.openCursor(null, null);
DatabaseEntry key = new DatabaseEntry();
- XidTB keyBinding = new XidTB();
- PreparedTransactionTB valueBinding = new PreparedTransactionTB();
+ XidBinding keyBinding = XidBinding.getInstance();
+ PreparedTransactionBinding valueBinding = new PreparedTransactionBinding();
DatabaseEntry value = new DatabaseEntry();
while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
@@ -946,7 +871,7 @@ public class BDBMessageStore implements
}
}
-
+
dtxrh.completeDtxRecordRecovery();
}
@@ -1001,35 +926,14 @@ public class BDBMessageStore implements
}
//now remove the content data from the store if there is any.
+ DatabaseEntry contentKeyEntry = new DatabaseEntry();
+ LongBinding.longToEntry(messageId, contentKeyEntry);
+ _messageContentDb.delete(tx, contentKeyEntry);
-
-
- int offset = 0;
- do
+ if (_log.isDebugEnabled())
{
- DatabaseEntry contentKeyEntry = new DatabaseEntry();
- MessageContentKey_5 mck = new MessageContentKey_5(messageId,offset);
- TupleBinding<MessageContentKey> contentKeyTupleBinding = new MessageContentKeyTB_5();
- contentKeyTupleBinding.objectToEntry(mck, contentKeyEntry);
- //Use a partial record for the value to prevent retrieving the
- //data itself as we only need the key to identify what to remove.
- DatabaseEntry value = new DatabaseEntry();
- value.setPartial(0, 4, true);
-
- status = _messageContentDb.get(null,contentKeyEntry, value, LockMode.READ_COMMITTED);
-
- if(status == OperationStatus.SUCCESS)
- {
-
- offset += IntegerBinding.entryToInt(value);
- _messageContentDb.delete(tx, contentKeyEntry);
- if (_log.isDebugEnabled())
- {
- _log.debug("Deleted content chunk offset " + mck.getOffset() + " for message " + messageId);
- }
- }
+ _log.debug("Deleted content for message " + messageId);
}
- while (status == OperationStatus.SUCCESS);
commit(tx, sync);
complete = true;
@@ -1128,11 +1032,11 @@ public class BDBMessageStore implements
exchange.getTypeShortString(), exchange.isAutoDelete());
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new AMQShortStringTB();
+ AMQShortStringBinding keyBinding = AMQShortStringBinding.getInstance();
keyBinding.objectToEntry(exchange.getNameShortString(), key);
DatabaseEntry value = new DatabaseEntry();
- TupleBinding exchangeBinding = new ExchangeTB();
+ ExchangeBinding exchangeBinding = ExchangeBinding.getInstance();
exchangeBinding.objectToEntry(exchangeRec, value);
try
@@ -1152,7 +1056,7 @@ public class BDBMessageStore implements
public void removeExchange(Exchange exchange) throws AMQStoreException
{
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new AMQShortStringTB();
+ AMQShortStringBinding keyBinding = AMQShortStringBinding.getInstance();
keyBinding.objectToEntry(exchange.getNameShortString(), key);
try
{
@@ -1182,7 +1086,7 @@ public class BDBMessageStore implements
if (_state != State.RECOVERING)
{
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = _bindingTupleBindingFactory.getInstance();
+ QueueBindingTupleBinding keyBinding = QueueBindingTupleBinding.getInstance();
keyBinding.objectToEntry(bindingRecord, key);
@@ -1211,7 +1115,7 @@ public class BDBMessageStore implements
throws AMQStoreException
{
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = _bindingTupleBindingFactory.getInstance();
+ QueueBindingTupleBinding keyBinding = QueueBindingTupleBinding.getInstance();
keyBinding.objectToEntry(new BindingRecord(exchange.getNameShortString(), queue.getNameShortString(), routingKey, args), key);
try
@@ -1268,11 +1172,11 @@ public class BDBMessageStore implements
if (_state != State.RECOVERING)
{
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new AMQShortStringTB();
+ AMQShortStringBinding keyBinding = AMQShortStringBinding.getInstance();
keyBinding.objectToEntry(queueRecord.getNameShortString(), key);
DatabaseEntry value = new DatabaseEntry();
- TupleBinding queueBinding = _queueTupleBindingFactory.getInstance();
+ QueueBinding queueBinding = QueueBinding.getInstance();
queueBinding.objectToEntry(queueRecord, value);
try
@@ -1306,18 +1210,18 @@ public class BDBMessageStore implements
try
{
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new AMQShortStringTB();
+ AMQShortStringBinding keyBinding = AMQShortStringBinding.getInstance();
keyBinding.objectToEntry(queue.getNameShortString(), key);
DatabaseEntry value = new DatabaseEntry();
DatabaseEntry newValue = new DatabaseEntry();
- TupleBinding queueBinding = _queueTupleBindingFactory.getInstance();
+ QueueBinding queueBinding = QueueBinding.getInstance();
OperationStatus status = _queueDb.get(null, key, value, LockMode.DEFAULT);
if(status == OperationStatus.SUCCESS)
{
//read the existing record and apply the new exclusivity setting
- QueueRecord queueRecord = (QueueRecord) queueBinding.entryToObject(value);
+ QueueRecord queueRecord = queueBinding.entryToObject(value);
queueRecord.setExclusive(queue.isExclusive());
//write the updated entry to the store
@@ -1353,7 +1257,7 @@ public class BDBMessageStore implements
}
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new AMQShortStringTB();
+ AMQShortStringBinding keyBinding = AMQShortStringBinding.getInstance();
keyBinding.objectToEntry(name, key);
try
{
@@ -1468,7 +1372,7 @@ public class BDBMessageStore implements
AMQShortString name = AMQShortString.valueOf(queue.getResourceName());
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new QueueEntryTB();
+ QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
QueueEntryKey dd = new QueueEntryKey(name, messageId);
keyBinding.objectToEntry(dd, key);
DatabaseEntry value = new DatabaseEntry();
@@ -1505,10 +1409,10 @@ public class BDBMessageStore implements
AMQShortString name = new AMQShortString(queue.getResourceName());
DatabaseEntry key = new DatabaseEntry();
- EntryBinding keyBinding = new QueueEntryTB();
- QueueEntryKey dd = new QueueEntryKey(name, messageId);
+ QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
+ QueueEntryKey queueEntryKey = new QueueEntryKey(name, messageId);
- keyBinding.objectToEntry(dd, key);
+ keyBinding.objectToEntry(queueEntryKey, key);
if (_log.isDebugEnabled())
{
@@ -1545,21 +1449,21 @@ public class BDBMessageStore implements
}
- private void recordXid(com.sleepycat.je.Transaction txn,
- long format,
- byte[] globalId,
- byte[] branchId,
+ private void recordXid(com.sleepycat.je.Transaction txn,
+ long format,
+ byte[] globalId,
+ byte[] branchId,
Transaction.Record[] enqueues,
Transaction.Record[] dequeues) throws AMQStoreException
{
DatabaseEntry key = new DatabaseEntry();
Xid xid = new Xid(format, globalId, branchId);
- XidTB keyBinding = new XidTB();
+ XidBinding keyBinding = XidBinding.getInstance();
keyBinding.objectToEntry(xid,key);
DatabaseEntry value = new DatabaseEntry();
PreparedTransaction preparedTransaction = new PreparedTransaction(enqueues, dequeues);
- PreparedTransactionTB valueBinding = new PreparedTransactionTB();
+ PreparedTransactionBinding valueBinding = new PreparedTransactionBinding();
valueBinding.objectToEntry(preparedTransaction, value);
try
@@ -1578,8 +1482,8 @@ public class BDBMessageStore implements
{
DatabaseEntry key = new DatabaseEntry();
Xid xid = new Xid(format, globalId, branchId);
- XidTB keyBinding = new XidTB();
-
+ XidBinding keyBinding = XidBinding.getInstance();
+
keyBinding.objectToEntry(xid, key);
@@ -1606,7 +1510,7 @@ public class BDBMessageStore implements
throw new AMQStoreException("Error accessing database while removing xid: " + e.getMessage(), e);
}
}
-
+
/**
* Commits all operations performed within a given transaction.
*
@@ -1681,7 +1585,7 @@ public class BDBMessageStore implements
QueueEntryKey dd = new QueueEntryKey(queueName, 0);
- EntryBinding keyBinding = new QueueEntryTB();
+ QueueEntryBinding keyBinding = QueueEntryBinding.getInstance();
keyBinding.objectToEntry(dd, key);
DatabaseEntry value = new DatabaseEntry();
@@ -1689,7 +1593,7 @@ public class BDBMessageStore implements
LinkedList<Long> messageIds = new LinkedList<Long>();
OperationStatus status = cursor.getSearchKeyRange(key, value, LockMode.DEFAULT);
- dd = (QueueEntryKey) keyBinding.entryToObject(key);
+ dd = keyBinding.entryToObject(key);
while ((status == OperationStatus.SUCCESS) && dd.getQueueName().equals(queueName))
{
@@ -1698,7 +1602,7 @@ public class BDBMessageStore implements
status = cursor.getNext(key, value, LockMode.DEFAULT);
if (status == OperationStatus.SUCCESS)
{
- dd = (QueueEntryKey) keyBinding.entryToObject(key);
+ dd = keyBinding.entryToObject(key);
}
}
@@ -1739,32 +1643,29 @@ public class BDBMessageStore implements
*
* @param tx The transaction for the operation.
* @param messageId The message to store the data for.
- * @param offset The offset of the data chunk in the message.
* @param contentBody The content of the data chunk.
*
* @throws AMQStoreException If the operation fails for any reason, or if the specified message does not exist.
*/
- protected void addContent(final com.sleepycat.je.Transaction tx, long messageId, int offset,
+ protected void addContent(final com.sleepycat.je.Transaction tx, long messageId,
ByteBuffer contentBody) throws AMQStoreException
{
DatabaseEntry key = new DatabaseEntry();
- TupleBinding<MessageContentKey> keyBinding = new MessageContentKeyTB_5();
- keyBinding.objectToEntry(new MessageContentKey_5(messageId, offset), key);
+ LongBinding.longToEntry(messageId, key);
DatabaseEntry value = new DatabaseEntry();
- TupleBinding<ByteBuffer> messageBinding = new ContentTB();
- messageBinding.objectToEntry(contentBody, value);
+ ContentBinding messageBinding = ContentBinding.getInstance();
+ messageBinding.objectToEntry(contentBody.array(), value);
try
{
OperationStatus status = _messageContentDb.put(tx, key, value);
if (status != OperationStatus.SUCCESS)
{
- throw new AMQStoreException("Error adding content chunk offset" + offset + " for message id " + messageId + ": "
- + status);
+ throw new AMQStoreException("Error adding content for message id " + messageId + ": " + status);
}
if (_log.isDebugEnabled())
{
- _log.debug("Storing content chunk offset" + offset + " for message " + messageId + "[Transaction" + tx + "]");
+ _log.debug("Storing content for message " + messageId + "[Transaction" + tx + "]");
}
}
catch (DatabaseException e)
@@ -1796,7 +1697,7 @@ public class BDBMessageStore implements
LongBinding.longToEntry(messageId, key);
DatabaseEntry value = new DatabaseEntry();
- TupleBinding messageBinding = _metaDataTupleBindingFactory.getInstance();
+ MessageMetaDataBinding messageBinding = MessageMetaDataBinding.getInstance();
messageBinding.objectToEntry(messageMetaData, value);
try
{
@@ -1832,7 +1733,7 @@ public class BDBMessageStore implements
DatabaseEntry key = new DatabaseEntry();
LongBinding.longToEntry(messageId, key);
DatabaseEntry value = new DatabaseEntry();
- TupleBinding messageBinding = _metaDataTupleBindingFactory.getInstance();
+ MessageMetaDataBinding messageBinding = MessageMetaDataBinding.getInstance();
try
{
@@ -1842,7 +1743,7 @@ public class BDBMessageStore implements
throw new AMQStoreException("Metadata not found for message with id " + messageId);
}
- StorableMessageMetaData mdd = (StorableMessageMetaData) messageBinding.entryToObject(value);
+ StorableMessageMetaData mdd = messageBinding.entryToObject(value);
return mdd;
}
@@ -1868,87 +1769,41 @@ public class BDBMessageStore implements
{
DatabaseEntry contentKeyEntry = new DatabaseEntry();
- //Start from 0 offset and search for the starting chunk.
- MessageContentKey_5 mck = new MessageContentKey_5(messageId, 0);
- TupleBinding<MessageContentKey> contentKeyTupleBinding = new MessageContentKeyTB_5();
- contentKeyTupleBinding.objectToEntry(mck, contentKeyEntry);
+ LongBinding.longToEntry(messageId, contentKeyEntry);
DatabaseEntry value = new DatabaseEntry();
- TupleBinding<ByteBuffer> contentTupleBinding = new ContentTB();
+ ContentBinding contentTupleBinding = ContentBinding.getInstance();
if (_log.isDebugEnabled())
{
_log.debug("Message Id: " + messageId + " Getting content body from offset: " + offset);
}
- int written = 0;
- int seenSoFar = 0;
-
- Cursor cursor = null;
try
{
- cursor = _messageContentDb.openCursor(null, null);
-
- OperationStatus status = cursor.getSearchKeyRange(contentKeyEntry, value, LockMode.READ_UNCOMMITTED);
-
- while (status == OperationStatus.SUCCESS)
+ int written = 0;
+ OperationStatus status = _messageContentDb.get(null, contentKeyEntry, value, LockMode.READ_UNCOMMITTED);
+ if (status == OperationStatus.SUCCESS)
{
- mck = (MessageContentKey_5) contentKeyTupleBinding.entryToObject(contentKeyEntry);
- long id = mck.getMessageId();
-
- if(id != messageId)
+ byte[] dataAsBytes = contentTupleBinding.entryToObject(value);
+ int size = dataAsBytes.length;
+ if (offset > size)
{
- //we have exhausted all chunks for this message id, break
- break;
+ throw new RuntimeException("Offset " + offset + " is greater than message size " + size
+ + " for message id " + messageId + "!");
}
- int offsetInMessage = mck.getOffset();
- ByteBuffer buf = (ByteBuffer) contentTupleBinding.entryToObject(value);
-
- final int size = (int) buf.limit();
-
- seenSoFar += size;
-
- if(seenSoFar >= offset)
+ written = size - offset;
+ if(written > dst.remaining())
{
- byte[] dataAsBytes = buf.array();
-
- int posInArray = offset + written - offsetInMessage;
- int count = size - posInArray;
- if(count > dst.remaining())
- {
- count = dst.remaining();
- }
- dst.put(dataAsBytes,posInArray,count);
- written+=count;
-
- if(dst.remaining() == 0)
- {
- break;
- }
+ written = dst.remaining();
}
-
- status = cursor.getNext(contentKeyEntry, value, LockMode.RMW);
+ dst.put(dataAsBytes, offset, written);
}
-
return written;
}
catch (DatabaseException e)
{
- throw new AMQStoreException("Error writing AMQMessage with id " + messageId + " to database: " + e.getMessage(), e);
- }
- finally
- {
- if(cursor != null)
- {
- try
- {
- cursor.close();
- }
- catch (DatabaseException e)
- {
- throw new AMQStoreException("Error writing AMQMessage with id " + messageId + " to database: " + e.getMessage(), e);
- }
- }
+ throw new AMQStoreException("Error getting AMQMessage with id " + messageId + " to database: " + e.getMessage(), e);
}
}
@@ -1961,7 +1816,7 @@ public class BDBMessageStore implements
{
if(metaData.isPersistent())
{
- return new StoredBDBMessage(getNewMessageId(), metaData);
+ return (StoredMessage<T>) new StoredBDBMessage(getNewMessageId(), metaData);
}
else
{
@@ -1970,23 +1825,6 @@ public class BDBMessageStore implements
}
- //protected getters for the TupleBindingFactories
-
- protected QueueTupleBindingFactory getQueueTupleBindingFactory()
- {
- return _queueTupleBindingFactory;
- }
-
- protected BindingTupleBindingFactory getBindingTupleBindingFactory()
- {
- return _bindingTupleBindingFactory;
- }
-
- protected MessageMetaDataTupleBindingFactory getMetaDataTupleBindingFactory()
- {
- return _metaDataTupleBindingFactory;
- }
-
//Package getters for the various databases used by the Store
Database getMetaDataDb()
@@ -2019,65 +1857,9 @@ public class BDBMessageStore implements
return _queueBindingsDb;
}
- void visitMetaDataDb(DatabaseVisitor visitor) throws DatabaseException, AMQStoreException
- {
- visitDatabase(_messageMetaDataDb, visitor);
- }
-
- void visitContentDb(DatabaseVisitor visitor) throws DatabaseException, AMQStoreException
- {
- visitDatabase(_messageContentDb, visitor);
- }
-
- void visitQueues(DatabaseVisitor visitor) throws DatabaseException, AMQStoreException
- {
- visitDatabase(_queueDb, visitor);
- }
-
- void visitDelivery(DatabaseVisitor visitor) throws DatabaseException, AMQStoreException
+ Environment getEnvironment()
{
- visitDatabase(_deliveryDb, visitor);
- }
-
- void visitExchanges(DatabaseVisitor visitor) throws DatabaseException, AMQStoreException
- {
- visitDatabase(_exchangeDb, visitor);
- }
-
- void visitBindings(DatabaseVisitor visitor) throws DatabaseException, AMQStoreException
- {
- visitDatabase(_queueBindingsDb, visitor);
- }
-
- /**
- * Generic visitDatabase allows iteration through the specified database.
- *
- * @param database The database to visit
- * @param visitor The visitor to give each entry to.
- *
- * @throws DatabaseException If there is a problem with the Database structure
- * @throws AMQStoreException If there is a problem with the Database contents
- */
- void visitDatabase(Database database, DatabaseVisitor visitor) throws DatabaseException, AMQStoreException
- {
- Cursor cursor = database.openCursor(null, null);
-
- try
- {
- DatabaseEntry key = new DatabaseEntry();
- DatabaseEntry value = new DatabaseEntry();
- while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
- {
- visitor.visit(key, value);
- }
- }
- finally
- {
- if (cursor != null)
- {
- cursor.close();
- }
- }
+ return _environment;
}
private StoreFuture commit(com.sleepycat.je.Transaction tx, boolean syncCommit) throws DatabaseException
@@ -2279,7 +2061,7 @@ public class BDBMessageStore implements
}
- private class StoredBDBMessage implements StoredMessage
+ private class StoredBDBMessage implements StoredMessage<StorableMessageMetaData>
{
private final long _messageId;
@@ -2376,6 +2158,7 @@ public class BDBMessageStore implements
}
catch (AMQStoreException e)
{
+ // TODO maybe should throw a checked exception, or at least log before throwing
throw new RuntimeException(e);
}
}
@@ -2406,7 +2189,7 @@ public class BDBMessageStore implements
{
_dataRef = new SoftReference<byte[]>(_data);
BDBMessageStore.this.storeMetaData(txn, _messageId, _metaData);
- BDBMessageStore.this.addContent(txn, _messageId, 0,
+ BDBMessageStore.this.addContent(txn, _messageId,
_data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data));
}
catch(DatabaseException e)
@@ -2486,17 +2269,6 @@ public class BDBMessageStore implements
BDBMessageStore.this.dequeueMessage(_txn, queue, message.getMessageNumber());
}
- public void enqueueMessage(TransactionLogResource queue, long messageId) throws AMQStoreException
- {
- BDBMessageStore.this.enqueueMessage(_txn, queue, messageId);
- }
-
- public void dequeueMessage(TransactionLogResource queue, long messageId) throws AMQStoreException
- {
- BDBMessageStore.this.dequeueMessage(_txn, queue, messageId);
-
- }
-
public void commitTran() throws AMQStoreException
{
BDBMessageStore.this.commitTranImpl(_txn, true);
Copied: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/BindingRecord.java (from r1305371, qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/BindingRecord.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/BindingRecord.java?p2=qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/BindingRecord.java&p1=qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/BindingRecord.java&r1=1305371&r2=1305809&rev=1305809&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/BindingRecord.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/BindingRecord.java Tue Mar 27 11:04:02 2012
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.server.store.berkeleydb.records;
+package org.apache.qpid.server.store.berkeleydb.entry;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
Copied: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/ExchangeRecord.java (from r1305371, qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/ExchangeRecord.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/ExchangeRecord.java?p2=qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/ExchangeRecord.java&p1=qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/ExchangeRecord.java&r1=1305371&r2=1305809&rev=1305809&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/ExchangeRecord.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/ExchangeRecord.java Tue Mar 27 11:04:02 2012
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.server.store.berkeleydb.records;
+package org.apache.qpid.server.store.berkeleydb.entry;
import org.apache.qpid.framing.AMQShortString;
Copied: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/PreparedTransaction.java (from r1305371, qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/PreparedTransaction.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/PreparedTransaction.java?p2=qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/PreparedTransaction.java&p1=qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/PreparedTransaction.java&r1=1305371&r2=1305809&rev=1305809&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/PreparedTransaction.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/PreparedTransaction.java Tue Mar 27 11:04:02 2012
@@ -19,7 +19,7 @@
*
*/
-package org.apache.qpid.server.store.berkeleydb.records;
+package org.apache.qpid.server.store.berkeleydb.entry;
import org.apache.qpid.server.store.MessageStore;
Copied: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/QueueEntryKey.java (from r1305371, qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/QueueEntryKey.java?p2=qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/QueueEntryKey.java&p1=qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java&r1=1305371&r2=1305809&rev=1305809&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/QueueEntryKey.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/QueueEntryKey.java Tue Mar 27 11:04:02 2012
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.server.store.berkeleydb;
+package org.apache.qpid.server.store.berkeleydb.entry;
import org.apache.qpid.framing.AMQShortString;
@@ -27,23 +27,19 @@ public class QueueEntryKey
private AMQShortString _queueName;
private long _messageId;
-
public QueueEntryKey(AMQShortString queueName, long messageId)
{
_queueName = queueName;
_messageId = messageId;
}
-
public AMQShortString getQueueName()
{
return _queueName;
}
-
public long getMessageId()
{
return _messageId;
}
-
}
Copied: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/QueueRecord.java (from r1305371, qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/QueueRecord.java?p2=qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/QueueRecord.java&p1=qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java&r1=1305371&r2=1305809&rev=1305809&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/records/QueueRecord.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/QueueRecord.java Tue Mar 27 11:04:02 2012
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.server.store.berkeleydb.records;
+package org.apache.qpid.server.store.berkeleydb.entry;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
@@ -47,12 +47,12 @@ public class QueueRecord extends Object
{
return _owner;
}
-
+
public boolean isExclusive()
{
return _exclusive;
}
-
+
public void setExclusive(boolean exclusive)
{
_exclusive = exclusive;
Copied: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/Xid.java (from r1305371, qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/Xid.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/Xid.java?p2=qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/Xid.java&p1=qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/Xid.java&r1=1305371&r2=1305809&rev=1305809&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/keys/Xid.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/entry/Xid.java Tue Mar 27 11:04:02 2012
@@ -19,7 +19,7 @@
*
*/
-package org.apache.qpid.server.store.berkeleydb.keys;
+package org.apache.qpid.server.store.berkeleydb.entry;
public class Xid
{
Copied: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/AMQShortStringBinding.java (from r1305371, qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTB.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/AMQShortStringBinding.java?p2=qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/AMQShortStringBinding.java&p1=qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTB.java&r1=1305371&r2=1305809&rev=1305809&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringTB.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/AMQShortStringBinding.java Tue Mar 27 11:04:02 2012
@@ -18,32 +18,34 @@
* under the License.
*
*/
-package org.apache.qpid.server.store.berkeleydb;
+package org.apache.qpid.server.store.berkeleydb.tuple;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.framing.AMQShortString;
-public class AMQShortStringTB extends TupleBinding
+public class AMQShortStringBinding extends TupleBinding<AMQShortString>
{
- private static final Logger _log = Logger.getLogger(AMQShortStringTB.class);
+ private static final AMQShortStringBinding INSTANCE = new AMQShortStringBinding();
-
- public AMQShortStringTB()
+ public static AMQShortStringBinding getInstance()
{
+ return INSTANCE;
}
- public Object entryToObject(TupleInput tupleInput)
+ /** private constructor forces getInstance instead */
+ private AMQShortStringBinding() { }
+
+ public AMQShortString entryToObject(TupleInput tupleInput)
{
return AMQShortStringEncoding.readShortString(tupleInput);
}
- public void objectToEntry(Object object, TupleOutput tupleOutput)
+ public void objectToEntry(AMQShortString object, TupleOutput tupleOutput)
{
- AMQShortStringEncoding.writeShortString((AMQShortString)object, tupleOutput);
+ AMQShortStringEncoding.writeShortString(object, tupleOutput);
}
-
}
Copied: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ContentBinding.java (from r1305371, qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ContentBinding.java?p2=qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ContentBinding.java&p1=qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java&r1=1305371&r2=1305809&rev=1305809&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ContentBinding.java Tue Mar 27 11:04:02 2012
@@ -18,35 +18,35 @@
* under the License.
*
*/
-package org.apache.qpid.server.store.berkeleydb;
+package org.apache.qpid.server.store.berkeleydb.tuple;
import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
-import java.nio.ByteBuffer;
-
-public class ContentTB extends TupleBinding
+public class ContentBinding extends TupleBinding<byte[]>
{
- public Object entryToObject(TupleInput tupleInput)
- {
+ private static final ContentBinding INSTANCE = new ContentBinding();
- final int size = tupleInput.readInt();
- byte[] underlying = new byte[size];
- tupleInput.readFast(underlying);
- return ByteBuffer.wrap(underlying);
+ public static ContentBinding getInstance()
+ {
+ return INSTANCE;
}
- public void objectToEntry(Object object, TupleOutput tupleOutput)
- {
- ByteBuffer src = (ByteBuffer) object;
-
- src = src.slice();
+ /** private constructor forces getInstance instead */
+ private ContentBinding() { }
- byte[] chunkData = new byte[src.limit()];
- src.duplicate().get(chunkData);
+ @Override
+ public byte[] entryToObject(final TupleInput input)
+ {
+ byte[] data = new byte[input.available()];
+ input.read(data);
+ return data;
+ }
- tupleOutput.writeInt(chunkData.length);
- tupleOutput.writeFast(chunkData);
+ @Override
+ public void objectToEntry(final byte[] data, final TupleOutput output)
+ {
+ output.write(data);
}
}
Copied: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ExchangeBinding.java (from r1305371, qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ExchangeBinding.java?p2=qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ExchangeBinding.java&p1=qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java&r1=1305371&r2=1305809&rev=1305809&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/ExchangeBinding.java Tue Mar 27 11:04:02 2012
@@ -18,39 +18,40 @@
* under the License.
*
*/
-package org.apache.qpid.server.store.berkeleydb;
+package org.apache.qpid.server.store.berkeleydb.tuple;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
+import org.apache.qpid.server.store.berkeleydb.entry.ExchangeRecord;
import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.store.berkeleydb.records.ExchangeRecord;
-public class ExchangeTB extends TupleBinding
+public class ExchangeBinding extends TupleBinding<ExchangeRecord>
{
- private static final Logger _log = Logger.getLogger(ExchangeTB.class);
+ private static final ExchangeBinding INSTANCE = new ExchangeBinding();
- public ExchangeTB()
+ public static ExchangeBinding getInstance()
{
+ return INSTANCE;
}
- public Object entryToObject(TupleInput tupleInput)
- {
+ /** private constructor forces getInstance instead */
+ private ExchangeBinding() { }
+ public ExchangeRecord entryToObject(TupleInput tupleInput)
+ {
AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
AMQShortString typeName = AMQShortStringEncoding.readShortString(tupleInput);
boolean autoDelete = tupleInput.readBoolean();
-
+
return new ExchangeRecord(name, typeName, autoDelete);
}
- public void objectToEntry(Object object, TupleOutput tupleOutput)
+ public void objectToEntry(ExchangeRecord exchange, TupleOutput tupleOutput)
{
- ExchangeRecord exchange = (ExchangeRecord) object;
-
AMQShortStringEncoding.writeShortString(exchange.getNameShortString(), tupleOutput);
AMQShortStringEncoding.writeShortString(exchange.getType(), tupleOutput);
Copied: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/MessageMetaDataBinding.java (from r1305371, qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_5.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/MessageMetaDataBinding.java?p2=qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/MessageMetaDataBinding.java&p1=qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_5.java&r1=1305371&r2=1305809&rev=1305809&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_5.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/MessageMetaDataBinding.java Tue Mar 27 11:04:02 2012
@@ -18,8 +18,11 @@
* under the License.
*
*/
-package org.apache.qpid.server.store.berkeleydb.tuples;
+package org.apache.qpid.server.store.berkeleydb.tuple;
+import java.nio.ByteBuffer;
+
+import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
@@ -29,17 +32,26 @@ import org.apache.qpid.server.store.Stor
/**
* Handles the mapping to and from message meta data
*/
-public class MessageMetaDataTB_5 extends MessageMetaDataTB_4
+public class MessageMetaDataBinding extends TupleBinding<StorableMessageMetaData>
{
+ private static final MessageMetaDataBinding INSTANCE = new MessageMetaDataBinding();
+
+ public static MessageMetaDataBinding getInstance()
+ {
+ return INSTANCE;
+ }
+
+ /** private constructor forces getInstance instead */
+ private MessageMetaDataBinding() { }
@Override
- public Object entryToObject(TupleInput tupleInput)
+ public StorableMessageMetaData entryToObject(TupleInput tupleInput)
{
final int bodySize = tupleInput.readInt();
byte[] dataAsBytes = new byte[bodySize];
tupleInput.readFast(dataAsBytes);
- java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(dataAsBytes);
+ ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
buf.position(1);
buf = buf.slice();
MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]];
@@ -49,14 +61,12 @@ public class MessageMetaDataTB_5 extends
}
@Override
- public void objectToEntry(Object object, TupleOutput tupleOutput)
+ public void objectToEntry(StorableMessageMetaData metaData, TupleOutput tupleOutput)
{
- StorableMessageMetaData metaData = (StorableMessageMetaData) object;
-
final int bodySize = 1 + metaData.getStorableSize();
byte[] underlying = new byte[bodySize];
underlying[0] = (byte) metaData.getType().ordinal();
- java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(underlying);
+ ByteBuffer buf = ByteBuffer.wrap(underlying);
buf.position(1);
buf = buf.slice();
Copied: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java (from r1305371, qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/PreparedTransactionTB.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java?p2=qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java&p1=qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/PreparedTransactionTB.java&r1=1305371&r2=1305809&rev=1305809&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/PreparedTransactionTB.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/PreparedTransactionBinding.java Tue Mar 27 11:04:02 2012
@@ -19,7 +19,7 @@
*
*/
-package org.apache.qpid.server.store.berkeleydb.tuples;
+package org.apache.qpid.server.store.berkeleydb.tuple;
import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
@@ -28,9 +28,9 @@ import org.apache.qpid.server.message.En
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.store.berkeleydb.records.PreparedTransaction;
+import org.apache.qpid.server.store.berkeleydb.entry.PreparedTransaction;
-public class PreparedTransactionTB extends TupleBinding<PreparedTransaction>
+public class PreparedTransactionBinding extends TupleBinding<PreparedTransaction>
{
@Override
public PreparedTransaction entryToObject(TupleInput input)
@@ -109,7 +109,7 @@ public class PreparedTransactionTB exten
return true;
}
- public StoredMessage getStoredMessage()
+ public StoredMessage<?> getStoredMessage()
{
throw new UnsupportedOperationException();
}
Copied: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueBinding.java (from r1305371, qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_5.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueBinding.java?p2=qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueBinding.java&p1=qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_5.java&r1=1305371&r2=1305809&rev=1305809&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_5.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueBinding.java Tue Mar 27 11:04:02 2012
@@ -18,8 +18,9 @@
* under the License.
*
*/
-package org.apache.qpid.server.store.berkeleydb.tuples;
+package org.apache.qpid.server.store.berkeleydb.tuple;
+import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
import com.sleepycat.je.DatabaseException;
@@ -29,17 +30,22 @@ import org.apache.qpid.framing.AMQShortS
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
-import org.apache.qpid.server.store.berkeleydb.records.QueueRecord;
+import org.apache.qpid.server.store.berkeleydb.entry.QueueRecord;
-public class QueueTuple_5 extends QueueTuple_4
+public class QueueBinding extends TupleBinding<QueueRecord>
{
- private static final Logger _logger = Logger.getLogger(QueueTuple_5.class);
+ private static final Logger _logger = Logger.getLogger(QueueBinding.class);
- public QueueTuple_5()
+ private static final QueueBinding INSTANCE = new QueueBinding();
+
+ public static QueueBinding getInstance()
{
- super();
+ return INSTANCE;
}
+ /** private constructor forces getInstance instead */
+ private QueueBinding() { }
+
public QueueRecord entryToObject(TupleInput tupleInput)
{
try
Copied: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueBindingTupleBinding.java (from r1305371, qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_4.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueBindingTupleBinding.java?p2=qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueBindingTupleBinding.java&p1=qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_4.java&r1=1305371&r2=1305809&rev=1305809&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/BindingTuple_4.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueBindingTupleBinding.java Tue Mar 27 11:04:02 2012
@@ -18,29 +18,34 @@
* under the License.
*
*/
-package org.apache.qpid.server.store.berkeleydb.tuples;
+package org.apache.qpid.server.store.berkeleydb.tuple;
-import com.sleepycat.bind.tuple.TupleBinding;
-import com.sleepycat.bind.tuple.TupleInput;
-import com.sleepycat.bind.tuple.TupleOutput;
-import com.sleepycat.je.DatabaseException;
import org.apache.log4j.Logger;
-
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
-import org.apache.qpid.server.store.berkeleydb.records.BindingRecord;
+import org.apache.qpid.server.store.berkeleydb.entry.BindingRecord;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import com.sleepycat.je.DatabaseException;
-public class BindingTuple_4 extends TupleBinding<BindingRecord> implements BindingTuple
+public class QueueBindingTupleBinding extends TupleBinding<BindingRecord>
{
- protected static final Logger _log = Logger.getLogger(BindingTuple.class);
+ protected static final Logger _log = Logger.getLogger(QueueBindingTupleBinding.class);
+
+ private static final QueueBindingTupleBinding INSTANCE = new QueueBindingTupleBinding();
- public BindingTuple_4()
+ public static QueueBindingTupleBinding getInstance()
{
- super();
+ return INSTANCE;
}
+ /** private constructor forces getInstance instead */
+ private QueueBindingTupleBinding() { }
+
public BindingRecord entryToObject(TupleInput tupleInput)
{
AMQShortString exchangeName = AMQShortStringEncoding.readShortString(tupleInput);
Copied: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueEntryBinding.java (from r1305371, qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueEntryBinding.java?p2=qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueEntryBinding.java&p1=qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java&r1=1305371&r2=1305809&rev=1305809&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/QueueEntryBinding.java Tue Mar 27 11:04:02 2012
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.server.store.berkeleydb.tuples;
+package org.apache.qpid.server.store.berkeleydb.tuple;
import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
@@ -26,10 +26,21 @@ import com.sleepycat.bind.tuple.TupleOut
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
-import org.apache.qpid.server.store.berkeleydb.QueueEntryKey;
+import org.apache.qpid.server.store.berkeleydb.entry.QueueEntryKey;
-public class QueueEntryTB extends TupleBinding<QueueEntryKey>
+public class QueueEntryBinding extends TupleBinding<QueueEntryKey>
{
+
+ private static final QueueEntryBinding INSTANCE = new QueueEntryBinding();
+
+ public static QueueEntryBinding getInstance()
+ {
+ return INSTANCE;
+ }
+
+ /** private constructor forces getInstance instead */
+ private QueueEntryBinding() { }
+
public QueueEntryKey entryToObject(TupleInput tupleInput)
{
AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput);
Copied: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/StringMapBinding.java (from r1305371, qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StringMapBinding.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/StringMapBinding.java?p2=qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/StringMapBinding.java&p1=qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StringMapBinding.java&r1=1305371&r2=1305809&rev=1305809&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StringMapBinding.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/StringMapBinding.java Tue Mar 27 11:04:02 2012
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.server.store.berkeleydb;
+package org.apache.qpid.server.store.berkeleydb.tuple;
import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
@@ -29,9 +29,8 @@ import java.util.Map;
public class StringMapBinding extends TupleBinding<Map<String,String>>
{
-
private static final StringMapBinding INSTANCE = new StringMapBinding();
-
+
public Map<String, String> entryToObject(final TupleInput tupleInput)
{
int entries = tupleInput.readInt();
@@ -43,7 +42,6 @@ public class StringMapBinding extends Tu
return map;
}
-
public void objectToEntry(final Map<String, String> stringStringMap, final TupleOutput tupleOutput)
{
tupleOutput.writeInt(stringStringMap.size());
Copied: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/UUIDTupleBinding.java (from r1305371, qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/UUIDTupleBinding.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/UUIDTupleBinding.java?p2=qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/UUIDTupleBinding.java&p1=qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/UUIDTupleBinding.java&r1=1305371&r2=1305809&rev=1305809&view=diff
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/UUIDTupleBinding.java (original)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/UUIDTupleBinding.java Tue Mar 27 11:04:02 2012
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.server.store.berkeleydb;
+package org.apache.qpid.server.store.berkeleydb.tuple;
import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
@@ -29,7 +29,7 @@ import java.util.UUID;
public class UUIDTupleBinding extends TupleBinding<UUID>
{
private static final UUIDTupleBinding INSTANCE = new UUIDTupleBinding();
-
+
public UUID entryToObject(final TupleInput tupleInput)
{
return new UUID(tupleInput.readLong(), tupleInput.readLong());
@@ -38,13 +38,11 @@ public class UUIDTupleBinding extends Tu
public void objectToEntry(final UUID uuid, final TupleOutput tupleOutput)
{
tupleOutput.writeLong(uuid.getMostSignificantBits());
- tupleOutput.writeLong(uuid.getLeastSignificantBits());
+ tupleOutput.writeLong(uuid.getLeastSignificantBits());
}
public static UUIDTupleBinding getInstance()
{
return INSTANCE;
}
-
-
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org