You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2011/09/24 22:16:02 UTC

svn commit: r1175235 [4/5] - in /qpid/trunk: ./ qpid/java/ qpid/java/bdbstore/ qpid/java/bdbstore/bin/ qpid/java/bdbstore/etc/ qpid/java/bdbstore/etc/scripts/ qpid/java/bdbstore/src/ qpid/java/bdbstore/src/main/ qpid/java/bdbstore/src/main/java/ qpid/j...

Added: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_4.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_4.java?rev=1175235&view=auto
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_4.java (added)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_4.java Sat Sep 24 20:16:00 2011
@@ -0,0 +1,162 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.message.MessageMetaData;
+import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
+
+import java.io.*;
+
+/**
+ * Handles the mapping to and from 0-8/0-9 message meta data
+ */
+public class MessageMetaDataTB_4 extends TupleBinding<Object>
+{
+    private static final Logger _log = Logger.getLogger(MessageMetaDataTB_4.class);
+
+    public MessageMetaDataTB_4()
+    {
+    }
+
+    public Object entryToObject(TupleInput tupleInput)
+    {
+        try
+        {
+            final MessagePublishInfo publishBody = readMessagePublishInfo(tupleInput);
+            final ContentHeaderBody contentHeaderBody = readContentHeaderBody(tupleInput);
+            final int contentChunkCount = tupleInput.readInt();
+
+            return new MessageMetaData(publishBody, contentHeaderBody, contentChunkCount);
+        }
+        catch (Exception e)
+        {
+            _log.error("Error converting entry to object: " + e, e);
+            // annoyingly just have to return null since we cannot throw
+            return null;
+        }
+    }
+
+    public void objectToEntry(Object object, TupleOutput tupleOutput)
+    {
+        MessageMetaData message = (MessageMetaData) object;
+        try
+        {
+            writeMessagePublishInfo(message.getMessagePublishInfo(), tupleOutput);
+        }
+        catch (AMQException e)
+        {
+            // can't do anything else since the BDB interface precludes throwing any exceptions
+            // in practice we should never get an exception
+            throw new RuntimeException("Error converting object to entry: " + e, e);
+        }
+        writeContentHeader(message.getContentHeaderBody(), tupleOutput);
+        tupleOutput.writeInt(message.getContentChunkCount());
+    }
+
+    private MessagePublishInfo readMessagePublishInfo(TupleInput tupleInput)
+    {
+
+        final AMQShortString exchange = AMQShortStringEncoding.readShortString(tupleInput);
+        final AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
+        final boolean mandatory = tupleInput.readBoolean();
+        final boolean immediate = tupleInput.readBoolean();
+
+        return new MessagePublishInfo()
+        {
+
+            public AMQShortString getExchange()
+            {
+                return exchange;
+            }
+
+            public void setExchange(AMQShortString exchange)
+            {
+
+            }
+
+            public boolean isImmediate()
+            {
+                return immediate;
+            }
+
+            public boolean isMandatory()
+            {
+                return mandatory;
+            }
+
+            public AMQShortString getRoutingKey()
+            {
+                return routingKey;
+            }
+        }   ;
+
+    }
+
+    private ContentHeaderBody readContentHeaderBody(TupleInput tupleInput) throws AMQFrameDecodingException, AMQProtocolVersionException
+    {
+        int bodySize = tupleInput.readInt();
+        byte[] underlying = new byte[bodySize];
+        tupleInput.readFast(underlying);
+
+        try
+        {
+            return ContentHeaderBody.createFromBuffer(new DataInputStream(new ByteArrayInputStream(underlying)), bodySize);
+        }
+        catch (IOException e)
+        {
+            throw new AMQFrameDecodingException(null, e.getMessage(), e);
+        }
+    }
+
+    private void writeMessagePublishInfo(MessagePublishInfo publishBody, TupleOutput tupleOutput) throws AMQException
+    {
+
+        AMQShortStringEncoding.writeShortString(publishBody.getExchange(), tupleOutput);
+        AMQShortStringEncoding.writeShortString(publishBody.getRoutingKey(), tupleOutput);
+        tupleOutput.writeBoolean(publishBody.isMandatory());
+        tupleOutput.writeBoolean(publishBody.isImmediate());
+    }
+
+    private void writeContentHeader(ContentHeaderBody headerBody, TupleOutput tupleOutput)
+    {
+        // write out the content header body
+        final int bodySize = headerBody.getSize();
+        ByteArrayOutputStream baos = new ByteArrayOutputStream(bodySize);
+        try
+        {
+            headerBody.writePayload(new DataOutputStream(baos));
+            tupleOutput.writeInt(bodySize);
+            tupleOutput.writeFast(baos.toByteArray());
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+
+    }
+}

Added: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_5.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_5.java?rev=1175235&view=auto
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_5.java (added)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_5.java Sat Sep 24 20:16:00 2011
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.store.MessageMetaDataType;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+
+/**
+ * Handles the mapping to and from message meta data
+ */
+public class MessageMetaDataTB_5 extends MessageMetaDataTB_4
+{
+    private static final Logger _log = Logger.getLogger(MessageMetaDataTB_5.class);
+
+    @Override
+    public Object entryToObject(TupleInput tupleInput)
+    {
+        try
+        {
+            final int bodySize = tupleInput.readInt();
+            byte[] dataAsBytes = new byte[bodySize];
+            tupleInput.readFast(dataAsBytes);
+
+            java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(dataAsBytes);
+            buf.position(1);
+            buf = buf.slice();
+            MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]];
+            StorableMessageMetaData metaData = type.getFactory().createMetaData(buf);
+
+            return metaData;
+        }
+        catch (Exception e)
+        {
+            _log.error("Error converting entry to object: " + e, e);
+            // annoyingly just have to return null since we cannot throw
+            return null;
+        }
+    }
+
+    @Override
+    public void objectToEntry(Object object, TupleOutput tupleOutput)
+    {
+        StorableMessageMetaData metaData = (StorableMessageMetaData) object;
+
+        final int bodySize = 1 + metaData.getStorableSize();
+        byte[] underlying = new byte[bodySize];
+        underlying[0] = (byte) metaData.getType().ordinal();
+        java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(underlying);
+        buf.position(1);
+        buf = buf.slice();
+
+        metaData.writeToBuffer(0, buf);
+        tupleOutput.writeInt(bodySize);
+        tupleOutput.writeFast(underlying);
+    }
+}

Added: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java?rev=1175235&view=auto
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java (added)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java Sat Sep 24 20:16:00 2011
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+
+public class MessageMetaDataTupleBindingFactory extends TupleBindingFactory<Object>
+{
+    public MessageMetaDataTupleBindingFactory(int version)
+    {
+        super(version);
+    }
+
+    public TupleBinding<Object> getInstance()
+    {
+        switch (_version)
+        {
+            default:
+            case 5:
+                return new MessageMetaDataTB_5();
+            case 4:
+                return new MessageMetaDataTB_4();
+        }
+    }
+}

Added: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java?rev=1175235&view=auto
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java (added)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java Sat Sep 24 20:16:00 2011
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
+import org.apache.qpid.server.store.berkeleydb.QueueEntryKey;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+public class QueueEntryTB extends TupleBinding<QueueEntryKey>
+{
+    public QueueEntryKey entryToObject(TupleInput tupleInput)
+    {
+        AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput);
+        Long messageId = tupleInput.readLong();
+
+        return new QueueEntryKey(queueName, messageId);
+    }
+
+    public void objectToEntry(QueueEntryKey mk, TupleOutput tupleOutput)
+    {
+        AMQShortStringEncoding.writeShortString(mk.getQueueName(),tupleOutput);
+        tupleOutput.writeLong(mk.getMessageId());
+    }
+}
\ No newline at end of file

Added: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java?rev=1175235&view=auto
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java (added)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java Sat Sep 24 20:16:00 2011
@@ -0,0 +1,25 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+public interface QueueTuple
+{
+}

Added: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java?rev=1175235&view=auto
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java (added)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java Sat Sep 24 20:16:00 2011
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import org.apache.qpid.server.store.berkeleydb.records.QueueRecord;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+
+public class QueueTupleBindingFactory extends TupleBindingFactory<QueueRecord>
+{
+
+    public QueueTupleBindingFactory(int version)
+    {
+        super(version);
+    }
+
+    public TupleBinding<QueueRecord> getInstance()
+    {
+        switch (_version)
+        {
+            default:
+            case 5:
+                return new QueueTuple_5();
+            case 4:
+                return new QueueTuple_4();
+        }
+    }
+}

Added: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_4.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_4.java?rev=1175235&view=auto
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_4.java (added)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_4.java Sat Sep 24 20:16:00 2011
@@ -0,0 +1,72 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import com.sleepycat.je.DatabaseException;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
+import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
+import org.apache.qpid.server.store.berkeleydb.records.QueueRecord;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+
+public class QueueTuple_4 extends TupleBinding<QueueRecord> implements QueueTuple
+{
+    protected static final Logger _logger = Logger.getLogger(QueueTuple_4.class);
+
+    protected FieldTable _arguments;
+
+    public QueueTuple_4()
+    {
+        super();
+    }
+
+    public QueueRecord entryToObject(TupleInput tupleInput)
+    {
+        try
+        {
+            AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
+            AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput);
+            // Addition for Version 2 of this table, read the queue arguments
+            FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput);
+
+            return new QueueRecord(name, owner, false, arguments);
+        }
+        catch (DatabaseException e)
+        {
+            _logger.error("Unable to create binding: " + e, e);
+            return null;
+        }
+
+    }
+
+    public void objectToEntry(QueueRecord queue, TupleOutput tupleOutput)
+    {
+        AMQShortStringEncoding.writeShortString(queue.getNameShortString(), tupleOutput);
+        AMQShortStringEncoding.writeShortString(queue.getOwner(), tupleOutput);
+        // Addition for Version 2 of this table, store the queue arguments
+        FieldTableEncoding.writeFieldTable(queue.getArguments(), tupleOutput);
+    }
+}

Added: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_5.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_5.java?rev=1175235&view=auto
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_5.java (added)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_5.java Sat Sep 24 20:16:00 2011
@@ -0,0 +1,75 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import com.sleepycat.je.DatabaseException;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
+import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
+import org.apache.qpid.server.store.berkeleydb.records.QueueRecord;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+
+public class QueueTuple_5 extends QueueTuple_4
+{
+    protected static final Logger _logger = Logger.getLogger(QueueTuple_5.class);
+
+    protected FieldTable _arguments;
+
+    public QueueTuple_5()
+    {
+        super();
+    }
+
+    public QueueRecord entryToObject(TupleInput tupleInput)
+    {
+        try
+        {
+            AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
+            AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput);
+            // Addition for Version 2 of this table, read the queue arguments
+            FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput);
+            // Addition for Version 3 of this table, read the queue exclusivity
+            boolean exclusive = tupleInput.readBoolean();
+
+            return new QueueRecord(name, owner, exclusive, arguments);
+        }
+        catch (DatabaseException e)
+        {
+            _logger.error("Unable to create binding: " + e, e);
+            return null;
+        }
+
+    }
+
+    public void objectToEntry(QueueRecord queue, TupleOutput tupleOutput)
+    {
+        AMQShortStringEncoding.writeShortString(queue.getNameShortString(), tupleOutput);
+        AMQShortStringEncoding.writeShortString(queue.getOwner(), tupleOutput);
+        // Addition for Version 2 of this table, store the queue arguments
+        FieldTableEncoding.writeFieldTable(queue.getArguments(), tupleOutput);
+        // Addition for Version 3 of this table, store the queue exclusivity
+        tupleOutput.writeBoolean(queue.isExclusive());
+    }
+}

Added: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java?rev=1175235&view=auto
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java (added)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java Sat Sep 24 20:16:00 2011
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+
+public abstract class TupleBindingFactory<E>
+{
+    protected int _version;
+
+    public TupleBindingFactory(int version)
+    {
+        _version = version;
+    }
+
+    public abstract TupleBinding<E> getInstance();
+}

Added: qpid/trunk/qpid/java/bdbstore/src/resources/backup-log4j.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/resources/backup-log4j.xml?rev=1175235&view=auto
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/resources/backup-log4j.xml (added)
+++ qpid/trunk/qpid/java/bdbstore/src/resources/backup-log4j.xml Sat Sep 24 20:16:00 2011
@@ -0,0 +1,65 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ -
+ - Licensed to the Apache Software Foundation (ASF) under one
+ - or more contributor license agreements.  See the NOTICE file
+ - distributed with this work for additional information
+ - regarding copyright ownership.  The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License.  You may obtain a copy of the License at
+ -
+ -   http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied.  See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -
+ -->
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<!-- =============================================================================== -->
+<!--  This is a Log4j configuration specially created for the BDB Backup utility,    -->
+<!-- it outputs logging to the console for specifically designated console loggers   -->
+<!-- at info level or above only. This avoids spamming the user with any internals   -->
+<!-- of the Qpid code.                                                               -->
+<!--  Use a different logging set up to capture debugging output to diagnose errors. -->
+<!-- =============================================================================== -->
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" debug="false">
+
+    <!-- ====================================================== -->
+    <!-- Append messages to the console at info level or above. -->
+    <!-- ====================================================== -->
+
+    <appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
+        <param name="Target" value="System.out"/>
+        <param name="Threshold" value="info"/>
+
+        <layout class="org.apache.log4j.PatternLayout">
+            <!-- The default pattern: Date Priority [Category] Message\n -->
+            <param name="ConversionPattern" value="%m%n"/>
+        </layout>
+
+    </appender>
+
+    <!-- ================ -->
+    <!-- Limit categories -->
+    <!-- ================ -->
+
+    <category name="org.apache.qpid.server.store.berkeleydb.BDBBackup">
+        <priority value="info"/>
+    </category>
+
+    <!-- ======================= -->
+    <!-- Setup the Root category -->
+    <!-- ======================= -->
+
+    <root>
+        <appender-ref ref="CONSOLE"/>
+    </root>
+
+</log4j:configuration>

Added: qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncodingTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncodingTest.java?rev=1175235&view=auto
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncodingTest.java (added)
+++ qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncodingTest.java Sat Sep 24 20:16:00 2011
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import org.apache.qpid.framing.AMQShortString;
+
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+import junit.framework.TestCase;
+
+/**
+ * Tests for {@code AMQShortStringEncoding} including corner cases when string
+ * is null or over 127 characters in length
+ */
+public class AMQShortStringEncodingTest extends TestCase
+{
+
+    public void testWriteReadNullValues()
+    {
+        // write into tuple output
+        TupleOutput tupleOutput = new TupleOutput();
+        AMQShortStringEncoding.writeShortString(null, tupleOutput);
+        byte[] data = tupleOutput.getBufferBytes();
+
+        // read from tuple input
+        TupleInput tupleInput = new TupleInput(data);
+        AMQShortString result = AMQShortStringEncoding.readShortString(tupleInput);
+        assertNull("Expected null but got " + result, result);
+    }
+
+    public void testWriteReadShortStringWithLengthOver127()
+    {
+        AMQShortString value = createString('a', 128);
+
+        // write into tuple output
+        TupleOutput tupleOutput = new TupleOutput();
+        AMQShortStringEncoding.writeShortString(value, tupleOutput);
+        byte[] data = tupleOutput.getBufferBytes();
+
+        // read from tuple input
+        TupleInput tupleInput = new TupleInput(data);
+        AMQShortString result = AMQShortStringEncoding.readShortString(tupleInput);
+        assertEquals("Expected " + value + " but got " + result, value, result);
+    }
+
+    public void testWriteReadShortStringWithLengthLess127()
+    {
+        AMQShortString value = new AMQShortString("test");
+
+        // write into tuple output
+        TupleOutput tupleOutput = new TupleOutput();
+        AMQShortStringEncoding.writeShortString(value, tupleOutput);
+        byte[] data = tupleOutput.getBufferBytes();
+
+        // read from tuple input
+        TupleInput tupleInput = new TupleInput(data);
+        AMQShortString result = AMQShortStringEncoding.readShortString(tupleInput);
+        assertEquals("Expected " + value + " but got " + result, value, result);
+    }
+
+    private AMQShortString createString(char ch, int length)
+    {
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < length; i++)
+        {
+            sb.append(ch);
+        }
+        return new AMQShortString(sb.toString());
+    }
+
+}

Added: qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java?rev=1175235&view=auto
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java (added)
+++ qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java Sat Sep 24 20:16:00 2011
@@ -0,0 +1,470 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.message.MessageMetaData;
+import org.apache.qpid.server.message.MessageMetaData_0_10;
+import org.apache.qpid.server.store.MessageMetaDataType;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TransactionLog;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.MessageDeliveryMode;
+import org.apache.qpid.transport.MessageDeliveryPriority;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.MessageTransfer;
+
+/**
+ * Subclass of MessageStoreTest which runs the standard tests from the superclass against
+ * the BDB Store as well as additional tests specific to the DBB store-implementation.
+ */
+public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageStoreTest
+{
+    /**
+     * Tests that message metadata and content are successfully read back from a 
+     * store after it has been reloaded. Both 0-8 and 0-10 metadata is used to 
+     * verify their ability to co-exist within the store and be successful retrieved.
+     */
+    public void testBDBMessagePersistence() throws Exception
+    {
+        MessageStore store = getVirtualHost().getMessageStore();
+
+        BDBMessageStore bdbStore = assertBDBStore(store);
+
+        // Create content ByteBuffers.
+        // Split the content into 2 chunks for the 0-8 message, as per broker behaviour.
+        // Use a single chunk for the 0-10 message as per broker behaviour.
+        String bodyText = "jfhdjsflsdhfjdshfjdslhfjdslhfsjlhfsjkhfdsjkhfdsjkfhdslkjf";
+        
+        ByteBuffer firstContentBytes_0_8 = ByteBuffer.wrap(bodyText.substring(0, 10).getBytes());
+        ByteBuffer secondContentBytes_0_8 = ByteBuffer.wrap(bodyText.substring(10).getBytes());
+        
+        ByteBuffer completeContentBody_0_10 = ByteBuffer.wrap(bodyText.getBytes());
+        int bodySize = completeContentBody_0_10.limit();
+
+        /*
+         * Create and insert a 0-8 message (metadata and multi-chunk content)
+         */
+        MessagePublishInfo pubInfoBody_0_8 = createPublishInfoBody_0_8();
+        BasicContentHeaderProperties props_0_8 = createContentHeaderProperties_0_8();
+
+        ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize);
+
+        MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8, 0);
+        StoredMessage<MessageMetaData> storedMessage_0_8 = bdbStore.addMessage(messageMetaData_0_8);
+
+        long origArrivalTime_0_8 = messageMetaData_0_8.getArrivalTime();
+        long messageid_0_8 = storedMessage_0_8.getMessageNumber();
+
+        storedMessage_0_8.addContent(0, firstContentBytes_0_8);
+        storedMessage_0_8.addContent(firstContentBytes_0_8.limit(), secondContentBytes_0_8);
+        storedMessage_0_8.flushToStore();
+
+        /*
+         * Create and insert a 0-10 message (metadata and content)
+         */        
+        MessageProperties msgProps_0_10 = createMessageProperties_0_10(bodySize);
+        DeliveryProperties delProps_0_10 = createDeliveryProperties_0_10();
+        Header header_0_10 = new Header(msgProps_0_10, delProps_0_10);
+
+        MessageTransfer xfr_0_10 = new MessageTransfer("destination", MessageAcceptMode.EXPLICIT, 
+                MessageAcquireMode.PRE_ACQUIRED, header_0_10, completeContentBody_0_10);
+
+        MessageMetaData_0_10 messageMetaData_0_10 = new MessageMetaData_0_10(xfr_0_10);
+        StoredMessage<MessageMetaData_0_10> storedMessage_0_10 = bdbStore.addMessage(messageMetaData_0_10);
+
+        long origArrivalTime_0_10 = messageMetaData_0_10.getArrivalTime();
+        long messageid_0_10 = storedMessage_0_10.getMessageNumber();
+
+        storedMessage_0_10.addContent(0, completeContentBody_0_10);
+        storedMessage_0_10.flushToStore();
+
+        /*
+         * reload the store only (read-only)
+         */
+        bdbStore = reloadStoreReadOnly(bdbStore);
+
+        /*
+         * Read back and validate the 0-8 message metadata and content
+         */
+        StorableMessageMetaData storeableMMD_0_8 = bdbStore.getMessageMetaData(messageid_0_8);
+
+        assertEquals("Unexpected message type",MessageMetaDataType.META_DATA_0_8, storeableMMD_0_8.getType());
+        assertTrue("Unexpected instance type", storeableMMD_0_8 instanceof MessageMetaData);
+        MessageMetaData returnedMMD_0_8 = (MessageMetaData) storeableMMD_0_8;
+
+        assertEquals("Message arrival time has changed", origArrivalTime_0_8, returnedMMD_0_8.getArrivalTime());
+
+        MessagePublishInfo returnedPubBody_0_8 = returnedMMD_0_8.getMessagePublishInfo();
+        assertEquals("Message exchange has changed", pubInfoBody_0_8.getExchange(), returnedPubBody_0_8.getExchange());
+        assertEquals("Immediate flag has changed", pubInfoBody_0_8.isImmediate(), returnedPubBody_0_8.isImmediate());
+        assertEquals("Mandatory flag has changed", pubInfoBody_0_8.isMandatory(), returnedPubBody_0_8.isMandatory());
+        assertEquals("Routing key has changed", pubInfoBody_0_8.getRoutingKey(), returnedPubBody_0_8.getRoutingKey());
+
+        ContentHeaderBody returnedHeaderBody_0_8 = returnedMMD_0_8.getContentHeaderBody();
+        assertEquals("ContentHeader ClassID has changed", chb_0_8.classId, returnedHeaderBody_0_8.classId);
+        assertEquals("ContentHeader weight has changed", chb_0_8.weight, returnedHeaderBody_0_8.weight);
+        assertEquals("ContentHeader bodySize has changed", chb_0_8.bodySize, returnedHeaderBody_0_8.bodySize);
+
+        BasicContentHeaderProperties returnedProperties_0_8 = (BasicContentHeaderProperties) returnedHeaderBody_0_8.getProperties();
+        assertEquals("Property ContentType has changed", props_0_8.getContentTypeAsString(), returnedProperties_0_8.getContentTypeAsString());
+        assertEquals("Property MessageID has changed", props_0_8.getMessageIdAsString(), returnedProperties_0_8.getMessageIdAsString());
+
+        ByteBuffer recoveredContent_0_8 = ByteBuffer.allocate((int) chb_0_8.bodySize) ;
+        long recoveredCount_0_8 = bdbStore.getContent(messageid_0_8, 0, recoveredContent_0_8);
+        assertEquals("Incorrect amount of payload data recovered", chb_0_8.bodySize, recoveredCount_0_8);
+        String returnedPayloadString_0_8 = new String(recoveredContent_0_8.array());
+        assertEquals("Message Payload has changed", bodyText, returnedPayloadString_0_8);
+
+        /*
+         * Read back and validate the 0-10 message metadata and content
+         */
+        StorableMessageMetaData storeableMMD_0_10 = bdbStore.getMessageMetaData(messageid_0_10);
+
+        assertEquals("Unexpected message type",MessageMetaDataType.META_DATA_0_10, storeableMMD_0_10.getType());
+        assertTrue("Unexpected instance type", storeableMMD_0_10 instanceof MessageMetaData_0_10);
+        MessageMetaData_0_10 returnedMMD_0_10 = (MessageMetaData_0_10) storeableMMD_0_10;
+
+        assertEquals("Message arrival time has changed", origArrivalTime_0_10, returnedMMD_0_10.getArrivalTime());
+
+        DeliveryProperties returnedDelProps_0_10 = returnedMMD_0_10.getHeader().get(DeliveryProperties.class);
+        assertNotNull("DeliveryProperties were not returned", returnedDelProps_0_10);
+        assertEquals("Immediate flag has changed", delProps_0_10.getImmediate(), returnedDelProps_0_10.getImmediate());
+        assertEquals("Routing key has changed", delProps_0_10.getRoutingKey(), returnedDelProps_0_10.getRoutingKey());
+        assertEquals("Message exchange has changed", delProps_0_10.getExchange(), returnedDelProps_0_10.getExchange());
+        assertEquals("Message expiration has changed", delProps_0_10.getExpiration(), returnedDelProps_0_10.getExpiration());
+        assertEquals("Message delivery priority has changed", delProps_0_10.getPriority(), returnedDelProps_0_10.getPriority());
+
+        MessageProperties returnedMsgProps = returnedMMD_0_10.getHeader().get(MessageProperties.class);
+        assertNotNull("MessageProperties were not returned", returnedMsgProps);
+        assertTrue("Message correlationID has changed", Arrays.equals(msgProps_0_10.getCorrelationId(), returnedMsgProps.getCorrelationId()));
+        assertEquals("Message content length has changed", msgProps_0_10.getContentLength(), returnedMsgProps.getContentLength());
+        assertEquals("Message content type has changed", msgProps_0_10.getContentType(), returnedMsgProps.getContentType());
+
+        ByteBuffer recoveredContent = ByteBuffer.allocate((int) msgProps_0_10.getContentLength()) ;
+        long recoveredCount = bdbStore.getContent(messageid_0_10, 0, recoveredContent);
+        assertEquals("Incorrect amount of payload data recovered", msgProps_0_10.getContentLength(), recoveredCount);
+
+        String returnedPayloadString_0_10 = new String(recoveredContent.array());
+        assertEquals("Message Payload has changed", bodyText, returnedPayloadString_0_10);
+    }
+
+    private DeliveryProperties createDeliveryProperties_0_10()
+    {
+        DeliveryProperties delProps_0_10 = new DeliveryProperties();
+        
+        delProps_0_10.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
+        delProps_0_10.setImmediate(true);
+        delProps_0_10.setExchange("exchange12345");
+        delProps_0_10.setRoutingKey("routingKey12345");
+        delProps_0_10.setExpiration(5);
+        delProps_0_10.setPriority(MessageDeliveryPriority.ABOVE_AVERAGE);
+        
+        return delProps_0_10;
+    }
+
+    private MessageProperties createMessageProperties_0_10(int bodySize)
+    {
+        MessageProperties msgProps_0_10 = new MessageProperties();
+        msgProps_0_10.setContentLength(bodySize);
+        msgProps_0_10.setCorrelationId("qwerty".getBytes());
+        msgProps_0_10.setContentType("text/html");
+        
+        return msgProps_0_10;
+    }
+
+    /** 
+     * Close the provided store and create a new (read-only) store to read back the data.
+     * 
+     * Use this method instead of reloading the virtual host like other tests in order 
+     * to avoid the recovery handler deleting the message for not being on a queue.
+     */
+    private BDBMessageStore reloadStoreReadOnly(BDBMessageStore messageStore) throws Exception
+    {
+        messageStore.close();
+        File storePath = new File(String.valueOf(_config.getProperty("store.environment-path")));
+
+        BDBMessageStore newStore = new BDBMessageStore();
+        newStore.configure(storePath, false);
+        newStore.start();
+
+        return newStore;
+    }
+
+    private MessagePublishInfo createPublishInfoBody_0_8()
+    {
+        return new MessagePublishInfo()
+        {
+            public AMQShortString getExchange()
+            {
+                return new AMQShortString("exchange12345");
+            }
+
+            public void setExchange(AMQShortString exchange)
+            {
+            }
+
+            public boolean isImmediate()
+            {
+                return false;
+            }
+
+            public boolean isMandatory()
+            {
+                return true;
+            }
+
+            public AMQShortString getRoutingKey()
+            {
+                return new AMQShortString("routingKey12345");
+            }
+        };
+
+    }
+
+    private ContentHeaderBody createContentHeaderBody_0_8(BasicContentHeaderProperties props, int length)
+    {
+        MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
+        int classForBasic = methodRegistry.createBasicQosOkBody().getClazz();
+        return new ContentHeaderBody(classForBasic, 1, props, length);
+    }
+
+    private BasicContentHeaderProperties createContentHeaderProperties_0_8()
+    {
+        BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+        props.setDeliveryMode(Integer.valueOf(BasicContentHeaderProperties.PERSISTENT).byteValue());
+        props.setContentType("text/html");
+        props.getHeaders().setString("Test", "MST");
+        return props;
+    }
+    
+    /**
+     * Tests that messages which are added to the store and then removed using the
+     * public MessageStore interfaces are actually removed from the store by then
+     * interrogating the store with its own implementation methods and verifying
+     * expected exceptions are thrown to indicate the message is not present.
+     */
+    public void testMessageCreationAndRemoval() throws Exception
+    {
+        MessageStore store = getVirtualHost().getMessageStore();
+        BDBMessageStore bdbStore = assertBDBStore(store);
+
+        StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreMultiChunkMessage_0_8(store);
+        long messageid_0_8 = storedMessage_0_8.getMessageNumber();
+        
+        //remove the message in the fashion the broker normally would
+        storedMessage_0_8.remove();
+
+        //verify the removal using the BDB store implementation methods directly
+        try
+        {
+            // the next line should throw since the message id should not be found
+            bdbStore.getMessageMetaData(messageid_0_8);
+            fail("No exception thrown when message id not found getting metadata");
+        }
+        catch (AMQStoreException e)
+        {
+            // pass since exception expected
+        }
+
+        //expecting no content, allocate a 1 byte
+        ByteBuffer dst = ByteBuffer.allocate(1);
+
+        assertEquals("Retrieved content when none was expected", 
+                        0, bdbStore.getContent(messageid_0_8, 0, dst));
+    }
+    
+    private BDBMessageStore assertBDBStore(Object store)
+    {
+        if(!(store instanceof BDBMessageStore))
+        {
+            fail("Test requires an instance of BDBMessageStore to proceed");
+        }
+
+        return (BDBMessageStore) store;
+    }
+
+    private StoredMessage<MessageMetaData> createAndStoreMultiChunkMessage_0_8(MessageStore store)
+    {
+        byte[] body10Bytes = "0123456789".getBytes();
+        byte[] body5Bytes = "01234".getBytes();
+
+        ByteBuffer chunk1 = ByteBuffer.wrap(body10Bytes);
+        ByteBuffer chunk2 = ByteBuffer.wrap(body5Bytes);
+
+        int bodySize = body10Bytes.length + body5Bytes.length;
+
+        //create and store the message using the MessageStore interface
+        MessagePublishInfo pubInfoBody_0_8 = createPublishInfoBody_0_8();
+        BasicContentHeaderProperties props_0_8 = createContentHeaderProperties_0_8();
+
+        ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize);
+
+        MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8, 0);
+        StoredMessage<MessageMetaData> storedMessage_0_8 = store.addMessage(messageMetaData_0_8);
+
+        storedMessage_0_8.addContent(0, chunk1);
+        storedMessage_0_8.addContent(chunk1.limit(), chunk2);
+        storedMessage_0_8.flushToStore();
+
+        return storedMessage_0_8;
+    }
+
+    /**
+     * Tests transaction commit by utilising the enqueue and dequeue methods available
+     * in the TransactionLog interface implemented by the store, and verifying the
+     * behaviour using BDB implementation methods.
+     */
+    public void testTranCommit() throws Exception
+    {
+        TransactionLog log = getVirtualHost().getTransactionLog();
+
+        BDBMessageStore bdbStore = assertBDBStore(log);
+
+        final AMQShortString mockQueueName = new AMQShortString("queueName");
+        
+        TransactionLogResource mockQueue = new TransactionLogResource()
+        {
+            public String getResourceName()
+            {
+                return mockQueueName.asString();
+            }
+        };
+        
+        TransactionLog.Transaction txn = log.newTransaction();
+        
+        txn.enqueueMessage(mockQueue, 1L);
+        txn.enqueueMessage(mockQueue, 5L);
+        txn.commitTran();
+
+        List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName);
+        
+        assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
+        Long val = enqueuedIds.get(0);
+        assertEquals("First Message is incorrect", 1L, val.longValue());
+        val = enqueuedIds.get(1);
+        assertEquals("Second Message is incorrect", 5L, val.longValue());
+    }
+    
+    
+    /**
+     * Tests transaction rollback before a commit has occurred by utilising the 
+     * enqueue and dequeue methods available in the TransactionLog interface 
+     * implemented by the store, and verifying the behaviour using BDB 
+     * implementation methods.
+     */
+    public void testTranRollbackBeforeCommit() throws Exception
+    {
+        TransactionLog log = getVirtualHost().getTransactionLog();
+
+        BDBMessageStore bdbStore = assertBDBStore(log);
+
+        final AMQShortString mockQueueName = new AMQShortString("queueName");
+        
+        TransactionLogResource mockQueue = new TransactionLogResource()
+        {
+            public String getResourceName()
+            {
+                return mockQueueName.asString();
+            }
+        };
+        
+        TransactionLog.Transaction txn = log.newTransaction();
+        
+        txn.enqueueMessage(mockQueue, 21L);
+        txn.abortTran();
+        
+        txn = log.newTransaction();
+        txn.enqueueMessage(mockQueue, 22L);
+        txn.enqueueMessage(mockQueue, 23L);
+        txn.commitTran();
+
+        List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName);
+        
+        assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
+        Long val = enqueuedIds.get(0);
+        assertEquals("First Message is incorrect", 22L, val.longValue());
+        val = enqueuedIds.get(1);
+        assertEquals("Second Message is incorrect", 23L, val.longValue());
+    }
+    
+    /**
+     * Tests transaction rollback after a commit has occurred by utilising the 
+     * enqueue and dequeue methods available in the TransactionLog interface 
+     * implemented by the store, and verifying the behaviour using BDB 
+     * implementation methods.
+     */
+    public void testTranRollbackAfterCommit() throws Exception
+    {
+        TransactionLog log = getVirtualHost().getTransactionLog();
+
+        BDBMessageStore bdbStore = assertBDBStore(log);
+
+        final AMQShortString mockQueueName = new AMQShortString("queueName");
+        
+        TransactionLogResource mockQueue = new TransactionLogResource()
+        {
+            public String getResourceName()
+            {
+                return mockQueueName.asString();
+            }
+        };
+        
+        TransactionLog.Transaction txn = log.newTransaction();
+        
+        txn.enqueueMessage(mockQueue, 30L);
+        txn.commitTran();
+
+        txn = log.newTransaction();
+        txn.enqueueMessage(mockQueue, 31L);
+        txn.abortTran();
+        
+        txn = log.newTransaction();
+        txn.enqueueMessage(mockQueue, 32L);
+        txn.commitTran();
+        
+        List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName);
+        
+        assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
+        Long val = enqueuedIds.get(0);
+        assertEquals("First Message is incorrect", 30L, val.longValue());
+        val = enqueuedIds.get(1);
+        assertEquals("Second Message is incorrect", 32L, val.longValue());
+    }
+
+}

Added: qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java?rev=1175235&view=auto
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java (added)
+++ qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java Sat Sep 24 20:16:00 2011
@@ -0,0 +1,232 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+
+import junit.framework.TestCase;
+
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.url.URLSyntaxException;
+
+/**
+ * Prepares an older version brokers BDB store with the required 
+ * contents for use in the BDBStoreUpgradeTest. 
+ * 
+ * The store will then be used to verify that the upgraded is 
+ * completed properly and that once upgraded it functions as 
+ * expected with the new broker.
+ */
+public class BDBStoreUpgradeTestPreparer extends TestCase
+{
+    public static final String TOPIC_NAME="myUpgradeTopic";
+    public static final String SUB_NAME="myDurSubName";    
+    public static final String QUEUE_NAME="myUpgradeQueue";
+
+    private static AMQConnectionFactory _connFac;
+    private static final String CONN_URL = 
+        "amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672'";
+
+    /**
+     * Create a BDBStoreUpgradeTestPreparer instance
+     */
+    public BDBStoreUpgradeTestPreparer () throws URLSyntaxException
+    {
+        _connFac = new AMQConnectionFactory(CONN_URL);
+    }
+
+    /**
+     * Utility test method to allow running the preparation tool
+     * using the test framework
+     */
+    public void testPrepareBroker() throws Exception
+    {
+        prepareBroker();
+    }
+
+    private void prepareBroker() throws Exception
+    {
+        prepareQueues();
+        prepareDurableSubscription();
+    }
+
+    /**
+     * Prepare a queue for use in testing message and binding recovery
+     * after the upgrade is performed.
+     * 
+     * - Create a transacted session on the connection.
+     * - Use a consumer to create the (durable by default) queue.
+     * - Send 5 large messages to test (multi-frame) content recovery.
+     * - Send 1 small message to test (single-frame) content recovery.
+     * - Commit the session.
+     * - Send 5 small messages to test that uncommitted messages are not recovered.
+     *   following the upgrade.
+     * - Close the session.  
+     */
+    private void prepareQueues() throws Exception
+    {
+        // Create a connection
+        Connection connection = _connFac.createConnection();
+        connection.start();
+        connection.setExceptionListener(new ExceptionListener()
+        {
+            public void onException(JMSException e)
+            {
+                e.printStackTrace();
+            }
+        });
+        // Create a session on the connection, transacted to confirm delivery
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        Queue queue = session.createQueue(QUEUE_NAME);
+        // Create a consumer to ensure the queue gets created
+        // (and enter it into the store, as queues are made durable by default)
+        MessageConsumer messageConsumer = session.createConsumer(queue);
+        messageConsumer.close();
+
+        // Create a Message producer
+        MessageProducer messageProducer = session.createProducer(queue);
+
+        // Publish 5 persistent messages, 256k chars to ensure they are multi-frame
+        sendMessages(session, messageProducer, queue, DeliveryMode.PERSISTENT, 256*1024, 5);
+        // Publish 5 persistent messages, 1k chars to ensure they are single-frame
+        sendMessages(session, messageProducer, queue, DeliveryMode.PERSISTENT, 1*1024, 5);
+
+        session.commit();
+
+        // Publish 5 persistent messages which will NOT be committed and so should be 'lost'
+        sendMessages(session, messageProducer, queue, DeliveryMode.PERSISTENT, 1*1024, 5);
+
+        session.close();
+        connection.close();
+    }
+
+    /**
+     * Prepare a DurableSubscription backing queue for use in testing selector 
+     * recovery and queue exclusivity marking during the upgrade process.
+     * 
+     * - Create a transacted session on the connection.
+     * - Open and close a DurableSubscription with selector to create the backing queue.
+     * - Send a message which matches the selector.
+     * - Send a message which does not match the selector.
+     * - Send a message which matches the selector but will remain uncommitted.
+     * - Close the session.
+     */
+    private void prepareDurableSubscription() throws Exception
+    {
+
+        // Create a connection
+        TopicConnection connection = _connFac.createTopicConnection();
+        connection.start();
+        connection.setExceptionListener(new ExceptionListener()
+        {
+            public void onException(JMSException e)
+            {
+                e.printStackTrace();
+            }
+        });
+        // Create a session on the connection, transacted to confirm delivery
+        Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+        Topic topic = session.createTopic(TOPIC_NAME);
+
+        // Create and register a durable subscriber with selector and then close it
+        TopicSubscriber durSub1 = session.createDurableSubscriber(topic, SUB_NAME,"testprop='true'", false);
+        durSub1.close();
+
+        // Create a publisher and send a persistent message which matches the selector 
+        // followed by one that does not match, and another which matches but is not
+        // committed and so should be 'lost'
+        TopicSession pubSession = connection.createTopicSession(true, Session.SESSION_TRANSACTED);
+        TopicPublisher publisher = pubSession.createPublisher(topic);
+
+        publishMessages(session, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "true");
+        publishMessages(session, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "false");
+        pubSession.commit();
+        publishMessages(session, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "true");
+
+        publisher.close();
+        pubSession.close();
+
+    }
+
+    public static void sendMessages(Session session, MessageProducer messageProducer, 
+            Destination dest, int deliveryMode, int length, int numMesages) throws JMSException
+    {
+        for (int i = 1; i <= numMesages; i++)
+        {
+            Message message = session.createTextMessage(generateString(length));
+            message.setIntProperty("ID", i);
+            messageProducer.send(message, deliveryMode, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
+        }
+    }
+
+    public static void publishMessages(Session session, TopicPublisher publisher, 
+            Destination dest, int deliveryMode, int length, int numMesages, String selectorProperty) throws JMSException
+    {
+        for (int i = 1; i <= numMesages; i++)
+        {
+            Message message = session.createTextMessage(generateString(length));
+            message.setIntProperty("ID", i);
+            message.setStringProperty("testprop", selectorProperty);
+            publisher.publish(message, deliveryMode, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
+        }
+    }
+
+    /**
+     * Generates a string of a given length consisting of the sequence 0,1,2,..,9,0,1,2.
+     * 
+     * @param length number of characters in the string 
+     * @return string sequence of the given length
+     */
+    public static String generateString(int length)
+    {
+        char[] base_chars = new char[]{'0','1','2','3','4','5','6','7','8','9'};
+        char[] chars = new char[length];
+        for (int i = 0; i < (length); i++)
+        {
+            chars[i] = base_chars[i % 10];
+        }
+        return new String(chars);
+    }
+
+    /**
+     * Run the preparation tool.
+     * @param args Command line arguments.
+     */
+    public static void main(String[] args) throws Exception
+    {
+        BDBStoreUpgradeTestPreparer producer = new BDBStoreUpgradeTestPreparer();
+        producer.prepareBroker();
+    }
+}
\ No newline at end of file

Added: qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java?rev=1175235&view=auto
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java (added)
+++ qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java Sat Sep 24 20:16:00 2011
@@ -0,0 +1,540 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_NAME;
+import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SUB_NAME;
+import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.TOPIC_NAME;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.management.common.mbeans.ManagedQueue;
+import org.apache.qpid.server.message.MessageMetaData;
+import org.apache.qpid.server.store.TransactionLog;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_4;
+import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTupleBindingFactory;
+import org.apache.qpid.server.store.berkeleydb.tuples.MessageMetaDataTupleBindingFactory;
+import org.apache.qpid.test.utils.JMXTestUtils;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.je.DatabaseEntry;
+
+/**
+ * Tests upgrading a BDB store and using it with the new broker 
+ * after the required contents are entered into the store using 
+ * an old broker with the BDBStoreUpgradeTestPreparer. The store
+ * will then be used to verify that the upgraded is completed 
+ * properly and that once upgraded it functions as expected with 
+ * the new broker.
+ */
+public class BDBUpgradeTest extends QpidBrokerTestCase
+{
+    protected static final Logger _logger = LoggerFactory.getLogger(BDBUpgradeTest.class);
+
+    private static final String STRING_1024 = BDBStoreUpgradeTestPreparer.generateString(1024);
+    private static final String STRING_1024_256 = BDBStoreUpgradeTestPreparer.generateString(1024*256);
+    private static final String QPID_WORK_ORIG = System.getProperty("QPID_WORK");
+    private static final String QPID_HOME = System.getProperty("QPID_HOME");
+    private static final int VERSION_4 = 4;
+
+    private String _fromDir;
+    private String _toDir;
+    private String _toDirTwice;
+
+    @Override
+    public void setUp() throws Exception
+    {
+        assertNotNull("QPID_WORK must be set", QPID_WORK_ORIG);
+        assertNotNull("QPID_HOME must be set", QPID_HOME);
+
+        if(! isExternalBroker())
+        {
+            //override QPID_WORK to add the InVM port used so the store
+            //output from the upgrade tool can be found by the broker
+            setSystemProperty("QPID_WORK", QPID_WORK_ORIG + "/" + getPort());
+        }
+
+        _fromDir = QPID_HOME + "/bdbstore-to-upgrade/test-store";
+        _toDir = getWorkDirBaseDir() + "/bdbstore/test-store";
+        _toDirTwice = getWorkDirBaseDir() + "/bdbstore-upgraded-twice";
+
+        //Clear the two target directories if they exist.
+        File directory = new File(_toDir);
+        if (directory.exists() && directory.isDirectory())
+        {
+            FileUtils.delete(directory, true);
+        }
+        directory = new File(_toDirTwice);
+        if (directory.exists() && directory.isDirectory())
+        {
+            FileUtils.delete(directory, true);
+        }
+
+        //Upgrade the test store.
+        upgradeBrokerStore(_fromDir, _toDir);
+
+        //override the broker config used and then start the broker with the updated store
+        _configFile = new File(QPID_HOME, "etc/config-systests-bdb.xml");
+        setConfigurationProperty("management.enabled", "true");
+
+        super.setUp();       
+    }
+
+    private String getWorkDirBaseDir()
+    {
+        return QPID_WORK_ORIG + (isInternalBroker() ? "" : "/" + getPort());
+    }
+
+    /**
+     * Tests that the core upgrade method of the store upgrade tool passes through the exception
+     * from the BDBMessageStore indicating that the data on disk can't be loaded as the previous
+     * version because it has already been upgraded.
+     * @throws Exception
+     */
+    public void testMultipleUpgrades() throws Exception
+    {
+        //stop the broker started by setUp() in order to allow the second upgrade attempt to proceed
+        stopBroker();
+
+        try
+        {
+            new BDBStoreUpgrade(_toDir, _toDirTwice, null, false, true).upgradeFromVersion(VERSION_4);
+            fail("Second Upgrade Succeeded");
+        }
+        catch (Exception e)
+        {
+            System.err.println("Showing stack trace, we are expecting an 'Unable to load BDBStore' error");
+            e.printStackTrace();
+            assertTrue("Incorrect Exception Thrown:" + e.getMessage(),
+                    e.getMessage().contains("Unable to load BDBStore as version 4. Store on disk contains version 5 data"));
+        }
+    }
+
+    /**
+     * Test that the selector applied to the DurableSubscription was successfully
+     * transfered to the new store, and functions as expected with continued use
+     * by monitoring message count while sending new messages to the topic.
+     */
+    public void testSelectorDurability() throws Exception
+    {
+        JMXTestUtils jmxUtils = null;
+        try
+        {
+            jmxUtils = new JMXTestUtils(this, "guest", "guest");
+            jmxUtils.open();
+        }
+        catch (Exception e)
+        {
+            fail("Unable to establish JMX connection, test cannot proceed");
+        }
+
+        try
+        {
+            ManagedQueue dursubQueue = jmxUtils.getManagedQueue("clientid" + ":" + SUB_NAME);
+            assertEquals("DurableSubscription backing queue should have 1 message on it initially", 
+                          new Integer(1), dursubQueue.getMessageCount());
+            
+            // Create a connection and start it
+            TopicConnection connection = (TopicConnection) getConnection();
+            connection.start();
+            
+            // Send messages which don't match and do match the selector, checking message count
+            TopicSession pubSession = connection.createTopicSession(true, org.apache.qpid.jms.Session.SESSION_TRANSACTED);
+            Topic topic = pubSession.createTopic(TOPIC_NAME);
+            TopicPublisher publisher = pubSession.createPublisher(topic);
+            
+            BDBStoreUpgradeTestPreparer.publishMessages(pubSession, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "false");
+            pubSession.commit();
+            assertEquals("DurableSubscription backing queue should still have 1 message on it", 
+                        new Integer(1), dursubQueue.getMessageCount());
+            
+            BDBStoreUpgradeTestPreparer.publishMessages(pubSession, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "true");
+            pubSession.commit();
+            assertEquals("DurableSubscription backing queue should now have 2 messages on it", 
+                        new Integer(2), dursubQueue.getMessageCount());
+
+            dursubQueue.clearQueue();
+            pubSession.close();
+        }
+        finally
+        {
+            jmxUtils.close();
+        }
+    }
+
+    /**
+     * Test that the backing queue for the durable subscription created was successfully
+     * detected and set as being exclusive during the upgrade process, and that the
+     * regular queue was not.
+     */
+    public void testQueueExclusivity() throws Exception
+    {
+        JMXTestUtils jmxUtils = null;
+        try
+        {
+            jmxUtils = new JMXTestUtils(this, "guest", "guest");
+            jmxUtils.open();
+        }
+        catch (Exception e)
+        {
+            fail("Unable to establish JMX connection, test cannot proceed");
+        }
+
+        try
+        {
+            ManagedQueue queue = jmxUtils.getManagedQueue(QUEUE_NAME);
+            assertFalse("Queue should not have been marked as Exclusive during upgrade", queue.isExclusive());
+
+            ManagedQueue dursubQueue = jmxUtils.getManagedQueue("clientid" + ":" + SUB_NAME);
+            assertTrue("DurableSubscription backing queue should have been marked as Exclusive during upgrade", dursubQueue.isExclusive());
+        }
+        finally
+        {
+            jmxUtils.close();
+        }
+    }
+
+    /**
+     * Test that the upgraded queue continues to function properly when used
+     * for persistent messaging and restarting the broker. 
+     * 
+     * Sends the new messages to the queue BEFORE consuming those which were
+     * sent before the upgrade. In doing so, this also serves to test that 
+     * the queue bindings were successfully transitioned during the upgrade.
+     */
+    public void testBindingAndMessageDurabability() throws Exception
+    {
+        // Create a connection and start it
+        TopicConnection connection = (TopicConnection) getConnection();
+        connection.start();
+
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(QUEUE_NAME);
+        MessageProducer messageProducer = session.createProducer(queue);
+
+        // Send a new message
+        BDBStoreUpgradeTestPreparer.sendMessages(session, messageProducer, queue, DeliveryMode.PERSISTENT, 256*1024, 1);
+
+        session.close();
+
+        // Restart the broker
+        restartBroker();
+
+        // Drain the queue of all messages
+        connection = (TopicConnection) getConnection();
+        connection.start();
+        consumeQueueMessages(connection, true);
+    }
+
+    /**
+     * Test that all of the committed persistent messages previously sent to 
+     * the broker are properly received following update of the MetaData and
+     * Content entries during the store upgrade process.
+     */
+    public void testConsumptionOfUpgradedMessages() throws Exception
+    {
+        // Create a connection and start it
+        Connection connection = getConnection();
+        connection.start();
+
+        consumeDurableSubscriptionMessages(connection);
+        consumeQueueMessages(connection, false);
+    }
+
+    /**
+     * Tests store migration containing messages for non-existing queue.
+     *
+     * @throws Exception
+     */
+    public void testMigrationOfMessagesForNonExistingQueues() throws Exception
+    {
+        stopBroker();
+
+        // copy store data into a new location for adding of phantom message
+        File storeLocation = new File(_fromDir);
+        File target = new File(_toDirTwice);
+        if (!target.exists())
+        {
+            target.mkdirs();
+        }
+        FileUtils.copyRecursive(storeLocation, target);
+
+        // delete migrated data
+        File directory = new File(_toDir);
+        if (directory.exists() && directory.isDirectory())
+        {
+            FileUtils.delete(directory, true);
+        }
+
+        // test data
+        String nonExistingQueueName = getTestQueueName();
+        String messageText = "Test Phantom Message";
+
+        // add message
+        addMessageForNonExistingQueue(target, VERSION_4, nonExistingQueueName, messageText);
+
+        String[] inputs = { "Yes", "Yes", "Yes" };
+        upgradeBrokerStoreInInterractiveMode(_toDirTwice, _toDir, inputs);
+
+        // start broker
+        startBroker();
+
+        // Create a connection and start it
+        Connection connection = getConnection();
+        connection.start();
+
+        // consume a message for non-existing store
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(nonExistingQueueName);
+        MessageConsumer messageConsumer = session.createConsumer(queue);
+        Message message = messageConsumer.receive(1000);
+
+        // assert consumed message
+        assertNotNull("Message was not migrated!", message);
+        assertTrue("Unexpected message received!", message instanceof TextMessage);
+        String text = ((TextMessage) message).getText();
+        assertEquals("Message migration failed!", messageText, text);
+    }
+
+    /**
+     * An utility method to upgrade broker with simulation user interactions
+     *
+     * @param fromDir
+     *            location of the store to migrate
+     * @param toDir
+     *            location of where migrated data will be stored
+     * @param inputs
+     *            user answers on upgrade tool questions
+     * @throws Exception
+     */
+    private void upgradeBrokerStoreInInterractiveMode(String fromDir, String toDir, final String[] inputs)
+            throws Exception
+    {
+        // save to restore system.in after data migration
+        InputStream stdin = System.in;
+
+        // set fake system in to simulate user interactions
+        // FIXME: it is a quite dirty simulator of system input but it does the job
+        System.setIn(new InputStream()
+        {
+
+            int counter = 0;
+
+            public synchronized int read(byte b[], int off, int len)
+            {
+                byte[] src = (inputs[counter] + "\n").getBytes();
+                System.arraycopy(src, 0, b, off, src.length);
+                counter++;
+                return src.length;
+            }
+
+            @Override
+            public int read() throws IOException
+            {
+                return -1;
+            }
+        });
+
+        try
+        {
+            // Upgrade the test store.
+            new BDBStoreUpgrade(fromDir, toDir, null, true, true).upgradeFromVersion(VERSION_4);
+        }
+        finally
+        {
+            // restore system in
+            System.setIn(stdin);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private void addMessageForNonExistingQueue(File storeLocation, int storeVersion, String nonExistingQueueName,
+            String messageText) throws Exception
+    {
+        final AMQShortString queueName = new AMQShortString(nonExistingQueueName);
+        BDBMessageStore store = new BDBMessageStore(storeVersion);
+        store.configure(storeLocation, false);
+        try
+        {
+            store.start();
+
+            // store message objects
+            ByteBuffer completeContentBody = ByteBuffer.wrap(messageText.getBytes("UTF-8"));
+            long bodySize = completeContentBody.limit();
+            MessagePublishInfo pubInfoBody = new MessagePublishInfoImpl(new AMQShortString("amq.direct"), false,
+                    false, queueName);
+            BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+            props.setDeliveryMode(Integer.valueOf(BasicContentHeaderProperties.PERSISTENT).byteValue());
+            props.setContentType("text/plain");
+            props.setType("text/plain");
+            props.setMessageId("whatever");
+            props.setEncoding("UTF-8");
+            props.getHeaders().setString("Test", "MST");
+            MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
+            int classForBasic = methodRegistry.createBasicQosOkBody().getClazz();
+            ContentHeaderBody contentHeaderBody = new ContentHeaderBody(classForBasic, 1, props, bodySize);
+
+            // add content entry to database
+            long messageId = store.getNewMessageId();
+            TupleBinding<MessageContentKey> contentKeyTB = new MessageContentKeyTupleBindingFactory(storeVersion).getInstance();
+            MessageContentKey contentKey = null;
+            if (storeVersion == VERSION_4)
+            {
+                contentKey = new MessageContentKey_4(messageId, 0);
+            }
+            else
+            {
+                throw new Exception(storeVersion + " is not supported");
+            }
+            DatabaseEntry key = new DatabaseEntry();
+            contentKeyTB.objectToEntry(contentKey, key);
+            DatabaseEntry data = new DatabaseEntry();
+            ContentTB contentTB = new ContentTB();
+            contentTB.objectToEntry(completeContentBody, data);
+            store.getContentDb().put(null, key, data);
+
+            // add meta data entry to database
+            TupleBinding<Long> longTB = TupleBinding.getPrimitiveBinding(Long.class);
+            TupleBinding<Object> metaDataTB = new MessageMetaDataTupleBindingFactory(storeVersion).getInstance();
+            key = new DatabaseEntry();
+            data = new DatabaseEntry();
+            longTB.objectToEntry(new Long(messageId), key);
+            MessageMetaData metaData = new MessageMetaData(pubInfoBody, contentHeaderBody, 1);
+            metaDataTB.objectToEntry(metaData, data);
+            store.getMetaDataDb().put(null, key, data);
+
+            // add delivery entry to database
+            TransactionLogResource mockQueue = new TransactionLogResource()
+            {
+                public String getResourceName()
+                {
+                    return queueName.asString();
+                }
+            };
+            TransactionLog log = (TransactionLog) store;
+            TransactionLog.Transaction txn = log.newTransaction();
+            txn.enqueueMessage(mockQueue, messageId);
+            txn.commitTran();
+        }
+        finally
+        {
+            // close store
+            store.close();
+        }
+    }
+
+    private void consumeDurableSubscriptionMessages(Connection connection) throws Exception
+    {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Topic topic = session.createTopic(TOPIC_NAME);
+
+        TopicSubscriber durSub = session.createDurableSubscriber(topic, SUB_NAME,"testprop='true'", false);
+
+        // Retrieve the matching message 
+        Message m = durSub.receive(2000);
+        assertNotNull("Failed to receive an expected message", m);
+        assertEquals("Selector property did not match", "true", m.getStringProperty("testprop"));
+        assertEquals("ID property did not match", 1, m.getIntProperty("ID"));
+        assertEquals("Message content was not as expected",BDBStoreUpgradeTestPreparer.generateString(1024) , ((TextMessage)m).getText());
+
+        // Verify that neither the non-matching or uncommitted message are received
+        m = durSub.receive(1000);
+        assertNull("No more messages should have been recieved", m);
+
+        durSub.close();
+        session.close();
+    }
+
+    private void consumeQueueMessages(Connection connection, boolean extraMessage) throws Exception
+    {
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue queue = session.createQueue(QUEUE_NAME);
+
+        MessageConsumer consumer = session.createConsumer(queue);
+        Message m;
+
+        // Retrieve the initial pre-upgrade messages
+        for (int i=1; i <= 5 ; i++)
+        {
+            m = consumer.receive(2000);
+            assertNotNull("Failed to receive an expected message", m);
+            assertEquals("ID property did not match", i, m.getIntProperty("ID"));
+            assertEquals("Message content was not as expected", STRING_1024_256, ((TextMessage)m).getText());
+        }
+        for (int i=1; i <= 5 ; i++)
+        {
+            m = consumer.receive(2000);
+            assertNotNull("Failed to receive an expected message", m);
+            assertEquals("ID property did not match", i, m.getIntProperty("ID"));
+            assertEquals("Message content was not as expected", STRING_1024, ((TextMessage)m).getText());
+        }
+
+        if(extraMessage)
+        {
+            //verify that the extra message is received
+            m = consumer.receive(2000);
+            assertNotNull("Failed to receive an expected message", m);
+            assertEquals("ID property did not match", 1, m.getIntProperty("ID"));
+            assertEquals("Message content was not as expected", STRING_1024_256, ((TextMessage)m).getText());
+        }
+
+        // Verify that no more messages are received
+        m = consumer.receive(1000);
+        assertNull("No more messages should have been recieved", m);
+
+        consumer.close();
+        session.close();
+    }
+
+    private void upgradeBrokerStore(String fromDir, String toDir) throws Exception
+    {
+        new BDBStoreUpgrade(_fromDir, _toDir, null, false, true).upgradeFromVersion(VERSION_4);
+    }
+}

Added: qpid/trunk/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-to-upgrade/test-store/00000000.jdb
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-to-upgrade/test-store/00000000.jdb?rev=1175235&view=auto
==============================================================================
Files qpid/trunk/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-to-upgrade/test-store/00000000.jdb (added) and qpid/trunk/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-to-upgrade/test-store/00000000.jdb Sat Sep 24 20:16:00 2011 differ



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org