You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2011/09/24 22:16:02 UTC
svn commit: r1175235 [4/5] - in /qpid/trunk: ./ qpid/java/
qpid/java/bdbstore/ qpid/java/bdbstore/bin/ qpid/java/bdbstore/etc/
qpid/java/bdbstore/etc/scripts/ qpid/java/bdbstore/src/
qpid/java/bdbstore/src/main/ qpid/java/bdbstore/src/main/java/ qpid/j...
Added: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_4.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_4.java?rev=1175235&view=auto
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_4.java (added)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_4.java Sat Sep 24 20:16:00 2011
@@ -0,0 +1,162 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.message.MessageMetaData;
+import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
+
+import java.io.*;
+
+/**
+ * Handles the mapping to and from 0-8/0-9 message meta data
+ */
+public class MessageMetaDataTB_4 extends TupleBinding<Object>
+{
+ private static final Logger _log = Logger.getLogger(MessageMetaDataTB_4.class);
+
+ public MessageMetaDataTB_4()
+ {
+ }
+
+ public Object entryToObject(TupleInput tupleInput)
+ {
+ try
+ {
+ final MessagePublishInfo publishBody = readMessagePublishInfo(tupleInput);
+ final ContentHeaderBody contentHeaderBody = readContentHeaderBody(tupleInput);
+ final int contentChunkCount = tupleInput.readInt();
+
+ return new MessageMetaData(publishBody, contentHeaderBody, contentChunkCount);
+ }
+ catch (Exception e)
+ {
+ _log.error("Error converting entry to object: " + e, e);
+ // annoyingly just have to return null since we cannot throw
+ return null;
+ }
+ }
+
+ public void objectToEntry(Object object, TupleOutput tupleOutput)
+ {
+ MessageMetaData message = (MessageMetaData) object;
+ try
+ {
+ writeMessagePublishInfo(message.getMessagePublishInfo(), tupleOutput);
+ }
+ catch (AMQException e)
+ {
+ // can't do anything else since the BDB interface precludes throwing any exceptions
+ // in practice we should never get an exception
+ throw new RuntimeException("Error converting object to entry: " + e, e);
+ }
+ writeContentHeader(message.getContentHeaderBody(), tupleOutput);
+ tupleOutput.writeInt(message.getContentChunkCount());
+ }
+
+ private MessagePublishInfo readMessagePublishInfo(TupleInput tupleInput)
+ {
+
+ final AMQShortString exchange = AMQShortStringEncoding.readShortString(tupleInput);
+ final AMQShortString routingKey = AMQShortStringEncoding.readShortString(tupleInput);
+ final boolean mandatory = tupleInput.readBoolean();
+ final boolean immediate = tupleInput.readBoolean();
+
+ return new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return exchange;
+ }
+
+ public void setExchange(AMQShortString exchange)
+ {
+
+ }
+
+ public boolean isImmediate()
+ {
+ return immediate;
+ }
+
+ public boolean isMandatory()
+ {
+ return mandatory;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return routingKey;
+ }
+ } ;
+
+ }
+
+ private ContentHeaderBody readContentHeaderBody(TupleInput tupleInput) throws AMQFrameDecodingException, AMQProtocolVersionException
+ {
+ int bodySize = tupleInput.readInt();
+ byte[] underlying = new byte[bodySize];
+ tupleInput.readFast(underlying);
+
+ try
+ {
+ return ContentHeaderBody.createFromBuffer(new DataInputStream(new ByteArrayInputStream(underlying)), bodySize);
+ }
+ catch (IOException e)
+ {
+ throw new AMQFrameDecodingException(null, e.getMessage(), e);
+ }
+ }
+
+ private void writeMessagePublishInfo(MessagePublishInfo publishBody, TupleOutput tupleOutput) throws AMQException
+ {
+
+ AMQShortStringEncoding.writeShortString(publishBody.getExchange(), tupleOutput);
+ AMQShortStringEncoding.writeShortString(publishBody.getRoutingKey(), tupleOutput);
+ tupleOutput.writeBoolean(publishBody.isMandatory());
+ tupleOutput.writeBoolean(publishBody.isImmediate());
+ }
+
+ private void writeContentHeader(ContentHeaderBody headerBody, TupleOutput tupleOutput)
+ {
+ // write out the content header body
+ final int bodySize = headerBody.getSize();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(bodySize);
+ try
+ {
+ headerBody.writePayload(new DataOutputStream(baos));
+ tupleOutput.writeInt(bodySize);
+ tupleOutput.writeFast(baos.toByteArray());
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ }
+}
Added: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_5.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_5.java?rev=1175235&view=auto
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_5.java (added)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTB_5.java Sat Sep 24 20:16:00 2011
@@ -0,0 +1,77 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.store.MessageMetaDataType;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+
+/**
+ * Handles the mapping to and from message meta data
+ */
+public class MessageMetaDataTB_5 extends MessageMetaDataTB_4
+{
+ private static final Logger _log = Logger.getLogger(MessageMetaDataTB_5.class);
+
+ @Override
+ public Object entryToObject(TupleInput tupleInput)
+ {
+ try
+ {
+ final int bodySize = tupleInput.readInt();
+ byte[] dataAsBytes = new byte[bodySize];
+ tupleInput.readFast(dataAsBytes);
+
+ java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(dataAsBytes);
+ buf.position(1);
+ buf = buf.slice();
+ MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]];
+ StorableMessageMetaData metaData = type.getFactory().createMetaData(buf);
+
+ return metaData;
+ }
+ catch (Exception e)
+ {
+ _log.error("Error converting entry to object: " + e, e);
+ // annoyingly just have to return null since we cannot throw
+ return null;
+ }
+ }
+
+ @Override
+ public void objectToEntry(Object object, TupleOutput tupleOutput)
+ {
+ StorableMessageMetaData metaData = (StorableMessageMetaData) object;
+
+ final int bodySize = 1 + metaData.getStorableSize();
+ byte[] underlying = new byte[bodySize];
+ underlying[0] = (byte) metaData.getType().ordinal();
+ java.nio.ByteBuffer buf = java.nio.ByteBuffer.wrap(underlying);
+ buf.position(1);
+ buf = buf.slice();
+
+ metaData.writeToBuffer(0, buf);
+ tupleOutput.writeInt(bodySize);
+ tupleOutput.writeFast(underlying);
+ }
+}
Added: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java?rev=1175235&view=auto
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java (added)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/MessageMetaDataTupleBindingFactory.java Sat Sep 24 20:16:00 2011
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+
+public class MessageMetaDataTupleBindingFactory extends TupleBindingFactory<Object>
+{
+ public MessageMetaDataTupleBindingFactory(int version)
+ {
+ super(version);
+ }
+
+ public TupleBinding<Object> getInstance()
+ {
+ switch (_version)
+ {
+ default:
+ case 5:
+ return new MessageMetaDataTB_5();
+ case 4:
+ return new MessageMetaDataTB_4();
+ }
+ }
+}
Added: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java?rev=1175235&view=auto
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java (added)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueEntryTB.java Sat Sep 24 20:16:00 2011
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
+import org.apache.qpid.server.store.berkeleydb.QueueEntryKey;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+public class QueueEntryTB extends TupleBinding<QueueEntryKey>
+{
+ public QueueEntryKey entryToObject(TupleInput tupleInput)
+ {
+ AMQShortString queueName = AMQShortStringEncoding.readShortString(tupleInput);
+ Long messageId = tupleInput.readLong();
+
+ return new QueueEntryKey(queueName, messageId);
+ }
+
+ public void objectToEntry(QueueEntryKey mk, TupleOutput tupleOutput)
+ {
+ AMQShortStringEncoding.writeShortString(mk.getQueueName(),tupleOutput);
+ tupleOutput.writeLong(mk.getMessageId());
+ }
+}
\ No newline at end of file
Added: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java?rev=1175235&view=auto
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java (added)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple.java Sat Sep 24 20:16:00 2011
@@ -0,0 +1,25 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+public interface QueueTuple
+{
+}
Added: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java?rev=1175235&view=auto
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java (added)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTupleBindingFactory.java Sat Sep 24 20:16:00 2011
@@ -0,0 +1,46 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import org.apache.qpid.server.store.berkeleydb.records.QueueRecord;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+
+public class QueueTupleBindingFactory extends TupleBindingFactory<QueueRecord>
+{
+
+ public QueueTupleBindingFactory(int version)
+ {
+ super(version);
+ }
+
+ public TupleBinding<QueueRecord> getInstance()
+ {
+ switch (_version)
+ {
+ default:
+ case 5:
+ return new QueueTuple_5();
+ case 4:
+ return new QueueTuple_4();
+ }
+ }
+}
Added: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_4.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_4.java?rev=1175235&view=auto
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_4.java (added)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_4.java Sat Sep 24 20:16:00 2011
@@ -0,0 +1,72 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import com.sleepycat.je.DatabaseException;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
+import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
+import org.apache.qpid.server.store.berkeleydb.records.QueueRecord;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+
+public class QueueTuple_4 extends TupleBinding<QueueRecord> implements QueueTuple
+{
+ protected static final Logger _logger = Logger.getLogger(QueueTuple_4.class);
+
+ protected FieldTable _arguments;
+
+ public QueueTuple_4()
+ {
+ super();
+ }
+
+ public QueueRecord entryToObject(TupleInput tupleInput)
+ {
+ try
+ {
+ AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
+ AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput);
+ // Addition for Version 2 of this table, read the queue arguments
+ FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput);
+
+ return new QueueRecord(name, owner, false, arguments);
+ }
+ catch (DatabaseException e)
+ {
+ _logger.error("Unable to create binding: " + e, e);
+ return null;
+ }
+
+ }
+
+ public void objectToEntry(QueueRecord queue, TupleOutput tupleOutput)
+ {
+ AMQShortStringEncoding.writeShortString(queue.getNameShortString(), tupleOutput);
+ AMQShortStringEncoding.writeShortString(queue.getOwner(), tupleOutput);
+ // Addition for Version 2 of this table, store the queue arguments
+ FieldTableEncoding.writeFieldTable(queue.getArguments(), tupleOutput);
+ }
+}
Added: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_5.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_5.java?rev=1175235&view=auto
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_5.java (added)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_5.java Sat Sep 24 20:16:00 2011
@@ -0,0 +1,75 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+import com.sleepycat.je.DatabaseException;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
+import org.apache.qpid.server.store.berkeleydb.FieldTableEncoding;
+import org.apache.qpid.server.store.berkeleydb.records.QueueRecord;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+
+public class QueueTuple_5 extends QueueTuple_4
+{
+ protected static final Logger _logger = Logger.getLogger(QueueTuple_5.class);
+
+ protected FieldTable _arguments;
+
+ public QueueTuple_5()
+ {
+ super();
+ }
+
+ public QueueRecord entryToObject(TupleInput tupleInput)
+ {
+ try
+ {
+ AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
+ AMQShortString owner = AMQShortStringEncoding.readShortString(tupleInput);
+ // Addition for Version 2 of this table, read the queue arguments
+ FieldTable arguments = FieldTableEncoding.readFieldTable(tupleInput);
+ // Addition for Version 3 of this table, read the queue exclusivity
+ boolean exclusive = tupleInput.readBoolean();
+
+ return new QueueRecord(name, owner, exclusive, arguments);
+ }
+ catch (DatabaseException e)
+ {
+ _logger.error("Unable to create binding: " + e, e);
+ return null;
+ }
+
+ }
+
+ public void objectToEntry(QueueRecord queue, TupleOutput tupleOutput)
+ {
+ AMQShortStringEncoding.writeShortString(queue.getNameShortString(), tupleOutput);
+ AMQShortStringEncoding.writeShortString(queue.getOwner(), tupleOutput);
+ // Addition for Version 2 of this table, store the queue arguments
+ FieldTableEncoding.writeFieldTable(queue.getArguments(), tupleOutput);
+ // Addition for Version 3 of this table, store the queue exclusivity
+ tupleOutput.writeBoolean(queue.isExclusive());
+ }
+}
Added: qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java?rev=1175235&view=auto
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java (added)
+++ qpid/trunk/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/TupleBindingFactory.java Sat Sep 24 20:16:00 2011
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.tuples;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+
+public abstract class TupleBindingFactory<E>
+{
+ protected int _version;
+
+ public TupleBindingFactory(int version)
+ {
+ _version = version;
+ }
+
+ public abstract TupleBinding<E> getInstance();
+}
Added: qpid/trunk/qpid/java/bdbstore/src/resources/backup-log4j.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/resources/backup-log4j.xml?rev=1175235&view=auto
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/resources/backup-log4j.xml (added)
+++ qpid/trunk/qpid/java/bdbstore/src/resources/backup-log4j.xml Sat Sep 24 20:16:00 2011
@@ -0,0 +1,65 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ -
+ - Licensed to the Apache Software Foundation (ASF) under one
+ - or more contributor license agreements. See the NOTICE file
+ - distributed with this work for additional information
+ - regarding copyright ownership. The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License. You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied. See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -
+ -->
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+
+<!-- =============================================================================== -->
+<!-- This is a Log4j configuration specially created for the BDB Backup utility, -->
+<!-- it outputs logging to the console for specifically designated console loggers -->
+<!-- at info level or above only. This avoids spamming the user with any internals -->
+<!-- of the Qpid code. -->
+<!-- Use a different logging set up to capture debugging output to diagnose errors. -->
+<!-- =============================================================================== -->
+
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/" debug="false">
+
+ <!-- ====================================================== -->
+ <!-- Append messages to the console at info level or above. -->
+ <!-- ====================================================== -->
+
+ <appender name="CONSOLE" class="org.apache.log4j.ConsoleAppender">
+ <param name="Target" value="System.out"/>
+ <param name="Threshold" value="info"/>
+
+ <layout class="org.apache.log4j.PatternLayout">
+ <!-- The default pattern: Date Priority [Category] Message\n -->
+ <param name="ConversionPattern" value="%m%n"/>
+ </layout>
+
+ </appender>
+
+ <!-- ================ -->
+ <!-- Limit categories -->
+ <!-- ================ -->
+
+ <category name="org.apache.qpid.server.store.berkeleydb.BDBBackup">
+ <priority value="info"/>
+ </category>
+
+ <!-- ======================= -->
+ <!-- Setup the Root category -->
+ <!-- ======================= -->
+
+ <root>
+ <appender-ref ref="CONSOLE"/>
+ </root>
+
+</log4j:configuration>
Added: qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncodingTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncodingTest.java?rev=1175235&view=auto
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncodingTest.java (added)
+++ qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/AMQShortStringEncodingTest.java Sat Sep 24 20:16:00 2011
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import org.apache.qpid.framing.AMQShortString;
+
+import com.sleepycat.bind.tuple.TupleInput;
+import com.sleepycat.bind.tuple.TupleOutput;
+
+import junit.framework.TestCase;
+
+/**
+ * Tests for {@code AMQShortStringEncoding} including corner cases when string
+ * is null or over 127 characters in length
+ */
+public class AMQShortStringEncodingTest extends TestCase
+{
+
+ public void testWriteReadNullValues()
+ {
+ // write into tuple output
+ TupleOutput tupleOutput = new TupleOutput();
+ AMQShortStringEncoding.writeShortString(null, tupleOutput);
+ byte[] data = tupleOutput.getBufferBytes();
+
+ // read from tuple input
+ TupleInput tupleInput = new TupleInput(data);
+ AMQShortString result = AMQShortStringEncoding.readShortString(tupleInput);
+ assertNull("Expected null but got " + result, result);
+ }
+
+ public void testWriteReadShortStringWithLengthOver127()
+ {
+ AMQShortString value = createString('a', 128);
+
+ // write into tuple output
+ TupleOutput tupleOutput = new TupleOutput();
+ AMQShortStringEncoding.writeShortString(value, tupleOutput);
+ byte[] data = tupleOutput.getBufferBytes();
+
+ // read from tuple input
+ TupleInput tupleInput = new TupleInput(data);
+ AMQShortString result = AMQShortStringEncoding.readShortString(tupleInput);
+ assertEquals("Expected " + value + " but got " + result, value, result);
+ }
+
+ public void testWriteReadShortStringWithLengthLess127()
+ {
+ AMQShortString value = new AMQShortString("test");
+
+ // write into tuple output
+ TupleOutput tupleOutput = new TupleOutput();
+ AMQShortStringEncoding.writeShortString(value, tupleOutput);
+ byte[] data = tupleOutput.getBufferBytes();
+
+ // read from tuple input
+ TupleInput tupleInput = new TupleInput(data);
+ AMQShortString result = AMQShortStringEncoding.readShortString(tupleInput);
+ assertEquals("Expected " + value + " but got " + result, value, result);
+ }
+
+ private AMQShortString createString(char ch, int length)
+ {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < length; i++)
+ {
+ sb.append(ch);
+ }
+ return new AMQShortString(sb.toString());
+ }
+
+}
Added: qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java?rev=1175235&view=auto
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java (added)
+++ qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java Sat Sep 24 20:16:00 2011
@@ -0,0 +1,470 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.message.MessageMetaData;
+import org.apache.qpid.server.message.MessageMetaData_0_10;
+import org.apache.qpid.server.store.MessageMetaDataType;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.store.TransactionLog;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.MessageDeliveryMode;
+import org.apache.qpid.transport.MessageDeliveryPriority;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.MessageTransfer;
+
+/**
+ * Subclass of MessageStoreTest which runs the standard tests from the superclass against
+ * the BDB Store as well as additional tests specific to the DBB store-implementation.
+ */
+public class BDBMessageStoreTest extends org.apache.qpid.server.store.MessageStoreTest
+{
+ /**
+ * Tests that message metadata and content are successfully read back from a
+ * store after it has been reloaded. Both 0-8 and 0-10 metadata is used to
+ * verify their ability to co-exist within the store and be successful retrieved.
+ */
+ public void testBDBMessagePersistence() throws Exception
+ {
+ MessageStore store = getVirtualHost().getMessageStore();
+
+ BDBMessageStore bdbStore = assertBDBStore(store);
+
+ // Create content ByteBuffers.
+ // Split the content into 2 chunks for the 0-8 message, as per broker behaviour.
+ // Use a single chunk for the 0-10 message as per broker behaviour.
+ String bodyText = "jfhdjsflsdhfjdshfjdslhfjdslhfsjlhfsjkhfdsjkhfdsjkfhdslkjf";
+
+ ByteBuffer firstContentBytes_0_8 = ByteBuffer.wrap(bodyText.substring(0, 10).getBytes());
+ ByteBuffer secondContentBytes_0_8 = ByteBuffer.wrap(bodyText.substring(10).getBytes());
+
+ ByteBuffer completeContentBody_0_10 = ByteBuffer.wrap(bodyText.getBytes());
+ int bodySize = completeContentBody_0_10.limit();
+
+ /*
+ * Create and insert a 0-8 message (metadata and multi-chunk content)
+ */
+ MessagePublishInfo pubInfoBody_0_8 = createPublishInfoBody_0_8();
+ BasicContentHeaderProperties props_0_8 = createContentHeaderProperties_0_8();
+
+ ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize);
+
+ MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8, 0);
+ StoredMessage<MessageMetaData> storedMessage_0_8 = bdbStore.addMessage(messageMetaData_0_8);
+
+ long origArrivalTime_0_8 = messageMetaData_0_8.getArrivalTime();
+ long messageid_0_8 = storedMessage_0_8.getMessageNumber();
+
+ storedMessage_0_8.addContent(0, firstContentBytes_0_8);
+ storedMessage_0_8.addContent(firstContentBytes_0_8.limit(), secondContentBytes_0_8);
+ storedMessage_0_8.flushToStore();
+
+ /*
+ * Create and insert a 0-10 message (metadata and content)
+ */
+ MessageProperties msgProps_0_10 = createMessageProperties_0_10(bodySize);
+ DeliveryProperties delProps_0_10 = createDeliveryProperties_0_10();
+ Header header_0_10 = new Header(msgProps_0_10, delProps_0_10);
+
+ MessageTransfer xfr_0_10 = new MessageTransfer("destination", MessageAcceptMode.EXPLICIT,
+ MessageAcquireMode.PRE_ACQUIRED, header_0_10, completeContentBody_0_10);
+
+ MessageMetaData_0_10 messageMetaData_0_10 = new MessageMetaData_0_10(xfr_0_10);
+ StoredMessage<MessageMetaData_0_10> storedMessage_0_10 = bdbStore.addMessage(messageMetaData_0_10);
+
+ long origArrivalTime_0_10 = messageMetaData_0_10.getArrivalTime();
+ long messageid_0_10 = storedMessage_0_10.getMessageNumber();
+
+ storedMessage_0_10.addContent(0, completeContentBody_0_10);
+ storedMessage_0_10.flushToStore();
+
+ /*
+ * reload the store only (read-only)
+ */
+ bdbStore = reloadStoreReadOnly(bdbStore);
+
+ /*
+ * Read back and validate the 0-8 message metadata and content
+ */
+ StorableMessageMetaData storeableMMD_0_8 = bdbStore.getMessageMetaData(messageid_0_8);
+
+ assertEquals("Unexpected message type",MessageMetaDataType.META_DATA_0_8, storeableMMD_0_8.getType());
+ assertTrue("Unexpected instance type", storeableMMD_0_8 instanceof MessageMetaData);
+ MessageMetaData returnedMMD_0_8 = (MessageMetaData) storeableMMD_0_8;
+
+ assertEquals("Message arrival time has changed", origArrivalTime_0_8, returnedMMD_0_8.getArrivalTime());
+
+ MessagePublishInfo returnedPubBody_0_8 = returnedMMD_0_8.getMessagePublishInfo();
+ assertEquals("Message exchange has changed", pubInfoBody_0_8.getExchange(), returnedPubBody_0_8.getExchange());
+ assertEquals("Immediate flag has changed", pubInfoBody_0_8.isImmediate(), returnedPubBody_0_8.isImmediate());
+ assertEquals("Mandatory flag has changed", pubInfoBody_0_8.isMandatory(), returnedPubBody_0_8.isMandatory());
+ assertEquals("Routing key has changed", pubInfoBody_0_8.getRoutingKey(), returnedPubBody_0_8.getRoutingKey());
+
+ ContentHeaderBody returnedHeaderBody_0_8 = returnedMMD_0_8.getContentHeaderBody();
+ assertEquals("ContentHeader ClassID has changed", chb_0_8.classId, returnedHeaderBody_0_8.classId);
+ assertEquals("ContentHeader weight has changed", chb_0_8.weight, returnedHeaderBody_0_8.weight);
+ assertEquals("ContentHeader bodySize has changed", chb_0_8.bodySize, returnedHeaderBody_0_8.bodySize);
+
+ BasicContentHeaderProperties returnedProperties_0_8 = (BasicContentHeaderProperties) returnedHeaderBody_0_8.getProperties();
+ assertEquals("Property ContentType has changed", props_0_8.getContentTypeAsString(), returnedProperties_0_8.getContentTypeAsString());
+ assertEquals("Property MessageID has changed", props_0_8.getMessageIdAsString(), returnedProperties_0_8.getMessageIdAsString());
+
+ ByteBuffer recoveredContent_0_8 = ByteBuffer.allocate((int) chb_0_8.bodySize) ;
+ long recoveredCount_0_8 = bdbStore.getContent(messageid_0_8, 0, recoveredContent_0_8);
+ assertEquals("Incorrect amount of payload data recovered", chb_0_8.bodySize, recoveredCount_0_8);
+ String returnedPayloadString_0_8 = new String(recoveredContent_0_8.array());
+ assertEquals("Message Payload has changed", bodyText, returnedPayloadString_0_8);
+
+ /*
+ * Read back and validate the 0-10 message metadata and content
+ */
+ StorableMessageMetaData storeableMMD_0_10 = bdbStore.getMessageMetaData(messageid_0_10);
+
+ assertEquals("Unexpected message type",MessageMetaDataType.META_DATA_0_10, storeableMMD_0_10.getType());
+ assertTrue("Unexpected instance type", storeableMMD_0_10 instanceof MessageMetaData_0_10);
+ MessageMetaData_0_10 returnedMMD_0_10 = (MessageMetaData_0_10) storeableMMD_0_10;
+
+ assertEquals("Message arrival time has changed", origArrivalTime_0_10, returnedMMD_0_10.getArrivalTime());
+
+ DeliveryProperties returnedDelProps_0_10 = returnedMMD_0_10.getHeader().get(DeliveryProperties.class);
+ assertNotNull("DeliveryProperties were not returned", returnedDelProps_0_10);
+ assertEquals("Immediate flag has changed", delProps_0_10.getImmediate(), returnedDelProps_0_10.getImmediate());
+ assertEquals("Routing key has changed", delProps_0_10.getRoutingKey(), returnedDelProps_0_10.getRoutingKey());
+ assertEquals("Message exchange has changed", delProps_0_10.getExchange(), returnedDelProps_0_10.getExchange());
+ assertEquals("Message expiration has changed", delProps_0_10.getExpiration(), returnedDelProps_0_10.getExpiration());
+ assertEquals("Message delivery priority has changed", delProps_0_10.getPriority(), returnedDelProps_0_10.getPriority());
+
+ MessageProperties returnedMsgProps = returnedMMD_0_10.getHeader().get(MessageProperties.class);
+ assertNotNull("MessageProperties were not returned", returnedMsgProps);
+ assertTrue("Message correlationID has changed", Arrays.equals(msgProps_0_10.getCorrelationId(), returnedMsgProps.getCorrelationId()));
+ assertEquals("Message content length has changed", msgProps_0_10.getContentLength(), returnedMsgProps.getContentLength());
+ assertEquals("Message content type has changed", msgProps_0_10.getContentType(), returnedMsgProps.getContentType());
+
+ ByteBuffer recoveredContent = ByteBuffer.allocate((int) msgProps_0_10.getContentLength()) ;
+ long recoveredCount = bdbStore.getContent(messageid_0_10, 0, recoveredContent);
+ assertEquals("Incorrect amount of payload data recovered", msgProps_0_10.getContentLength(), recoveredCount);
+
+ String returnedPayloadString_0_10 = new String(recoveredContent.array());
+ assertEquals("Message Payload has changed", bodyText, returnedPayloadString_0_10);
+ }
+
+ private DeliveryProperties createDeliveryProperties_0_10()
+ {
+ DeliveryProperties delProps_0_10 = new DeliveryProperties();
+
+ delProps_0_10.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
+ delProps_0_10.setImmediate(true);
+ delProps_0_10.setExchange("exchange12345");
+ delProps_0_10.setRoutingKey("routingKey12345");
+ delProps_0_10.setExpiration(5);
+ delProps_0_10.setPriority(MessageDeliveryPriority.ABOVE_AVERAGE);
+
+ return delProps_0_10;
+ }
+
+ private MessageProperties createMessageProperties_0_10(int bodySize)
+ {
+ MessageProperties msgProps_0_10 = new MessageProperties();
+ msgProps_0_10.setContentLength(bodySize);
+ msgProps_0_10.setCorrelationId("qwerty".getBytes());
+ msgProps_0_10.setContentType("text/html");
+
+ return msgProps_0_10;
+ }
+
+ /**
+ * Close the provided store and create a new (read-only) store to read back the data.
+ *
+ * Use this method instead of reloading the virtual host like other tests in order
+ * to avoid the recovery handler deleting the message for not being on a queue.
+ */
+ private BDBMessageStore reloadStoreReadOnly(BDBMessageStore messageStore) throws Exception
+ {
+ messageStore.close();
+ File storePath = new File(String.valueOf(_config.getProperty("store.environment-path")));
+
+ BDBMessageStore newStore = new BDBMessageStore();
+ newStore.configure(storePath, false);
+ newStore.start();
+
+ return newStore;
+ }
+
+ private MessagePublishInfo createPublishInfoBody_0_8()
+ {
+ return new MessagePublishInfo()
+ {
+ public AMQShortString getExchange()
+ {
+ return new AMQShortString("exchange12345");
+ }
+
+ public void setExchange(AMQShortString exchange)
+ {
+ }
+
+ public boolean isImmediate()
+ {
+ return false;
+ }
+
+ public boolean isMandatory()
+ {
+ return true;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return new AMQShortString("routingKey12345");
+ }
+ };
+
+ }
+
+ private ContentHeaderBody createContentHeaderBody_0_8(BasicContentHeaderProperties props, int length)
+ {
+ MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
+ int classForBasic = methodRegistry.createBasicQosOkBody().getClazz();
+ return new ContentHeaderBody(classForBasic, 1, props, length);
+ }
+
+ private BasicContentHeaderProperties createContentHeaderProperties_0_8()
+ {
+ BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+ props.setDeliveryMode(Integer.valueOf(BasicContentHeaderProperties.PERSISTENT).byteValue());
+ props.setContentType("text/html");
+ props.getHeaders().setString("Test", "MST");
+ return props;
+ }
+
+ /**
+ * Tests that messages which are added to the store and then removed using the
+ * public MessageStore interfaces are actually removed from the store by then
+ * interrogating the store with its own implementation methods and verifying
+ * expected exceptions are thrown to indicate the message is not present.
+ */
+ public void testMessageCreationAndRemoval() throws Exception
+ {
+ MessageStore store = getVirtualHost().getMessageStore();
+ BDBMessageStore bdbStore = assertBDBStore(store);
+
+ StoredMessage<MessageMetaData> storedMessage_0_8 = createAndStoreMultiChunkMessage_0_8(store);
+ long messageid_0_8 = storedMessage_0_8.getMessageNumber();
+
+ //remove the message in the fashion the broker normally would
+ storedMessage_0_8.remove();
+
+ //verify the removal using the BDB store implementation methods directly
+ try
+ {
+ // the next line should throw since the message id should not be found
+ bdbStore.getMessageMetaData(messageid_0_8);
+ fail("No exception thrown when message id not found getting metadata");
+ }
+ catch (AMQStoreException e)
+ {
+ // pass since exception expected
+ }
+
+ //expecting no content, allocate a 1 byte
+ ByteBuffer dst = ByteBuffer.allocate(1);
+
+ assertEquals("Retrieved content when none was expected",
+ 0, bdbStore.getContent(messageid_0_8, 0, dst));
+ }
+
+ private BDBMessageStore assertBDBStore(Object store)
+ {
+ if(!(store instanceof BDBMessageStore))
+ {
+ fail("Test requires an instance of BDBMessageStore to proceed");
+ }
+
+ return (BDBMessageStore) store;
+ }
+
+ private StoredMessage<MessageMetaData> createAndStoreMultiChunkMessage_0_8(MessageStore store)
+ {
+ byte[] body10Bytes = "0123456789".getBytes();
+ byte[] body5Bytes = "01234".getBytes();
+
+ ByteBuffer chunk1 = ByteBuffer.wrap(body10Bytes);
+ ByteBuffer chunk2 = ByteBuffer.wrap(body5Bytes);
+
+ int bodySize = body10Bytes.length + body5Bytes.length;
+
+ //create and store the message using the MessageStore interface
+ MessagePublishInfo pubInfoBody_0_8 = createPublishInfoBody_0_8();
+ BasicContentHeaderProperties props_0_8 = createContentHeaderProperties_0_8();
+
+ ContentHeaderBody chb_0_8 = createContentHeaderBody_0_8(props_0_8, bodySize);
+
+ MessageMetaData messageMetaData_0_8 = new MessageMetaData(pubInfoBody_0_8, chb_0_8, 0);
+ StoredMessage<MessageMetaData> storedMessage_0_8 = store.addMessage(messageMetaData_0_8);
+
+ storedMessage_0_8.addContent(0, chunk1);
+ storedMessage_0_8.addContent(chunk1.limit(), chunk2);
+ storedMessage_0_8.flushToStore();
+
+ return storedMessage_0_8;
+ }
+
+ /**
+ * Tests transaction commit by utilising the enqueue and dequeue methods available
+ * in the TransactionLog interface implemented by the store, and verifying the
+ * behaviour using BDB implementation methods.
+ */
+ public void testTranCommit() throws Exception
+ {
+ TransactionLog log = getVirtualHost().getTransactionLog();
+
+ BDBMessageStore bdbStore = assertBDBStore(log);
+
+ final AMQShortString mockQueueName = new AMQShortString("queueName");
+
+ TransactionLogResource mockQueue = new TransactionLogResource()
+ {
+ public String getResourceName()
+ {
+ return mockQueueName.asString();
+ }
+ };
+
+ TransactionLog.Transaction txn = log.newTransaction();
+
+ txn.enqueueMessage(mockQueue, 1L);
+ txn.enqueueMessage(mockQueue, 5L);
+ txn.commitTran();
+
+ List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName);
+
+ assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
+ Long val = enqueuedIds.get(0);
+ assertEquals("First Message is incorrect", 1L, val.longValue());
+ val = enqueuedIds.get(1);
+ assertEquals("Second Message is incorrect", 5L, val.longValue());
+ }
+
+
+ /**
+ * Tests transaction rollback before a commit has occurred by utilising the
+ * enqueue and dequeue methods available in the TransactionLog interface
+ * implemented by the store, and verifying the behaviour using BDB
+ * implementation methods.
+ */
+ public void testTranRollbackBeforeCommit() throws Exception
+ {
+ TransactionLog log = getVirtualHost().getTransactionLog();
+
+ BDBMessageStore bdbStore = assertBDBStore(log);
+
+ final AMQShortString mockQueueName = new AMQShortString("queueName");
+
+ TransactionLogResource mockQueue = new TransactionLogResource()
+ {
+ public String getResourceName()
+ {
+ return mockQueueName.asString();
+ }
+ };
+
+ TransactionLog.Transaction txn = log.newTransaction();
+
+ txn.enqueueMessage(mockQueue, 21L);
+ txn.abortTran();
+
+ txn = log.newTransaction();
+ txn.enqueueMessage(mockQueue, 22L);
+ txn.enqueueMessage(mockQueue, 23L);
+ txn.commitTran();
+
+ List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName);
+
+ assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
+ Long val = enqueuedIds.get(0);
+ assertEquals("First Message is incorrect", 22L, val.longValue());
+ val = enqueuedIds.get(1);
+ assertEquals("Second Message is incorrect", 23L, val.longValue());
+ }
+
+ /**
+ * Tests transaction rollback after a commit has occurred by utilising the
+ * enqueue and dequeue methods available in the TransactionLog interface
+ * implemented by the store, and verifying the behaviour using BDB
+ * implementation methods.
+ */
+ public void testTranRollbackAfterCommit() throws Exception
+ {
+ TransactionLog log = getVirtualHost().getTransactionLog();
+
+ BDBMessageStore bdbStore = assertBDBStore(log);
+
+ final AMQShortString mockQueueName = new AMQShortString("queueName");
+
+ TransactionLogResource mockQueue = new TransactionLogResource()
+ {
+ public String getResourceName()
+ {
+ return mockQueueName.asString();
+ }
+ };
+
+ TransactionLog.Transaction txn = log.newTransaction();
+
+ txn.enqueueMessage(mockQueue, 30L);
+ txn.commitTran();
+
+ txn = log.newTransaction();
+ txn.enqueueMessage(mockQueue, 31L);
+ txn.abortTran();
+
+ txn = log.newTransaction();
+ txn.enqueueMessage(mockQueue, 32L);
+ txn.commitTran();
+
+ List<Long> enqueuedIds = bdbStore.getEnqueuedMessages(mockQueueName);
+
+ assertEquals("Number of enqueued messages is incorrect", 2, enqueuedIds.size());
+ Long val = enqueuedIds.get(0);
+ assertEquals("First Message is incorrect", 30L, val.longValue());
+ val = enqueuedIds.get(1);
+ assertEquals("Second Message is incorrect", 32L, val.longValue());
+ }
+
+}
Added: qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java?rev=1175235&view=auto
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java (added)
+++ qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgradeTestPreparer.java Sat Sep 24 20:16:00 2011
@@ -0,0 +1,232 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+
+import junit.framework.TestCase;
+
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.url.URLSyntaxException;
+
+/**
+ * Prepares an older version brokers BDB store with the required
+ * contents for use in the BDBStoreUpgradeTest.
+ *
+ * The store will then be used to verify that the upgraded is
+ * completed properly and that once upgraded it functions as
+ * expected with the new broker.
+ */
+public class BDBStoreUpgradeTestPreparer extends TestCase
+{
+ public static final String TOPIC_NAME="myUpgradeTopic";
+ public static final String SUB_NAME="myDurSubName";
+ public static final String QUEUE_NAME="myUpgradeQueue";
+
+ private static AMQConnectionFactory _connFac;
+ private static final String CONN_URL =
+ "amqp://guest:guest@clientid/test?brokerlist='tcp://localhost:5672'";
+
+ /**
+ * Create a BDBStoreUpgradeTestPreparer instance
+ */
+ public BDBStoreUpgradeTestPreparer () throws URLSyntaxException
+ {
+ _connFac = new AMQConnectionFactory(CONN_URL);
+ }
+
+ /**
+ * Utility test method to allow running the preparation tool
+ * using the test framework
+ */
+ public void testPrepareBroker() throws Exception
+ {
+ prepareBroker();
+ }
+
+ private void prepareBroker() throws Exception
+ {
+ prepareQueues();
+ prepareDurableSubscription();
+ }
+
+ /**
+ * Prepare a queue for use in testing message and binding recovery
+ * after the upgrade is performed.
+ *
+ * - Create a transacted session on the connection.
+ * - Use a consumer to create the (durable by default) queue.
+ * - Send 5 large messages to test (multi-frame) content recovery.
+ * - Send 1 small message to test (single-frame) content recovery.
+ * - Commit the session.
+ * - Send 5 small messages to test that uncommitted messages are not recovered.
+ * following the upgrade.
+ * - Close the session.
+ */
+ private void prepareQueues() throws Exception
+ {
+ // Create a connection
+ Connection connection = _connFac.createConnection();
+ connection.start();
+ connection.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(JMSException e)
+ {
+ e.printStackTrace();
+ }
+ });
+ // Create a session on the connection, transacted to confirm delivery
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Queue queue = session.createQueue(QUEUE_NAME);
+ // Create a consumer to ensure the queue gets created
+ // (and enter it into the store, as queues are made durable by default)
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+ messageConsumer.close();
+
+ // Create a Message producer
+ MessageProducer messageProducer = session.createProducer(queue);
+
+ // Publish 5 persistent messages, 256k chars to ensure they are multi-frame
+ sendMessages(session, messageProducer, queue, DeliveryMode.PERSISTENT, 256*1024, 5);
+ // Publish 5 persistent messages, 1k chars to ensure they are single-frame
+ sendMessages(session, messageProducer, queue, DeliveryMode.PERSISTENT, 1*1024, 5);
+
+ session.commit();
+
+ // Publish 5 persistent messages which will NOT be committed and so should be 'lost'
+ sendMessages(session, messageProducer, queue, DeliveryMode.PERSISTENT, 1*1024, 5);
+
+ session.close();
+ connection.close();
+ }
+
+ /**
+ * Prepare a DurableSubscription backing queue for use in testing selector
+ * recovery and queue exclusivity marking during the upgrade process.
+ *
+ * - Create a transacted session on the connection.
+ * - Open and close a DurableSubscription with selector to create the backing queue.
+ * - Send a message which matches the selector.
+ * - Send a message which does not match the selector.
+ * - Send a message which matches the selector but will remain uncommitted.
+ * - Close the session.
+ */
+ private void prepareDurableSubscription() throws Exception
+ {
+
+ // Create a connection
+ TopicConnection connection = _connFac.createTopicConnection();
+ connection.start();
+ connection.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(JMSException e)
+ {
+ e.printStackTrace();
+ }
+ });
+ // Create a session on the connection, transacted to confirm delivery
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Topic topic = session.createTopic(TOPIC_NAME);
+
+ // Create and register a durable subscriber with selector and then close it
+ TopicSubscriber durSub1 = session.createDurableSubscriber(topic, SUB_NAME,"testprop='true'", false);
+ durSub1.close();
+
+ // Create a publisher and send a persistent message which matches the selector
+ // followed by one that does not match, and another which matches but is not
+ // committed and so should be 'lost'
+ TopicSession pubSession = connection.createTopicSession(true, Session.SESSION_TRANSACTED);
+ TopicPublisher publisher = pubSession.createPublisher(topic);
+
+ publishMessages(session, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "true");
+ publishMessages(session, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "false");
+ pubSession.commit();
+ publishMessages(session, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "true");
+
+ publisher.close();
+ pubSession.close();
+
+ }
+
+ public static void sendMessages(Session session, MessageProducer messageProducer,
+ Destination dest, int deliveryMode, int length, int numMesages) throws JMSException
+ {
+ for (int i = 1; i <= numMesages; i++)
+ {
+ Message message = session.createTextMessage(generateString(length));
+ message.setIntProperty("ID", i);
+ messageProducer.send(message, deliveryMode, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
+ }
+ }
+
+ public static void publishMessages(Session session, TopicPublisher publisher,
+ Destination dest, int deliveryMode, int length, int numMesages, String selectorProperty) throws JMSException
+ {
+ for (int i = 1; i <= numMesages; i++)
+ {
+ Message message = session.createTextMessage(generateString(length));
+ message.setIntProperty("ID", i);
+ message.setStringProperty("testprop", selectorProperty);
+ publisher.publish(message, deliveryMode, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
+ }
+ }
+
+ /**
+ * Generates a string of a given length consisting of the sequence 0,1,2,..,9,0,1,2.
+ *
+ * @param length number of characters in the string
+ * @return string sequence of the given length
+ */
+ public static String generateString(int length)
+ {
+ char[] base_chars = new char[]{'0','1','2','3','4','5','6','7','8','9'};
+ char[] chars = new char[length];
+ for (int i = 0; i < (length); i++)
+ {
+ chars[i] = base_chars[i % 10];
+ }
+ return new String(chars);
+ }
+
+ /**
+ * Run the preparation tool.
+ * @param args Command line arguments.
+ */
+ public static void main(String[] args) throws Exception
+ {
+ BDBStoreUpgradeTestPreparer producer = new BDBStoreUpgradeTestPreparer();
+ producer.prepareBroker();
+ }
+}
\ No newline at end of file
Added: qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java?rev=1175235&view=auto
==============================================================================
--- qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java (added)
+++ qpid/trunk/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java Sat Sep 24 20:16:00 2011
@@ -0,0 +1,540 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.QUEUE_NAME;
+import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.SUB_NAME;
+import static org.apache.qpid.server.store.berkeleydb.BDBStoreUpgradeTestPreparer.TOPIC_NAME;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.management.common.mbeans.ManagedQueue;
+import org.apache.qpid.server.message.MessageMetaData;
+import org.apache.qpid.server.store.TransactionLog;
+import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.store.berkeleydb.keys.MessageContentKey_4;
+import org.apache.qpid.server.store.berkeleydb.tuples.MessageContentKeyTupleBindingFactory;
+import org.apache.qpid.server.store.berkeleydb.tuples.MessageMetaDataTupleBindingFactory;
+import org.apache.qpid.test.utils.JMXTestUtils;
+import org.apache.qpid.test.utils.QpidBrokerTestCase;
+import org.apache.qpid.util.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.je.DatabaseEntry;
+
+/**
+ * Tests upgrading a BDB store and using it with the new broker
+ * after the required contents are entered into the store using
+ * an old broker with the BDBStoreUpgradeTestPreparer. The store
+ * will then be used to verify that the upgraded is completed
+ * properly and that once upgraded it functions as expected with
+ * the new broker.
+ */
+public class BDBUpgradeTest extends QpidBrokerTestCase
+{
+ protected static final Logger _logger = LoggerFactory.getLogger(BDBUpgradeTest.class);
+
+ private static final String STRING_1024 = BDBStoreUpgradeTestPreparer.generateString(1024);
+ private static final String STRING_1024_256 = BDBStoreUpgradeTestPreparer.generateString(1024*256);
+ private static final String QPID_WORK_ORIG = System.getProperty("QPID_WORK");
+ private static final String QPID_HOME = System.getProperty("QPID_HOME");
+ private static final int VERSION_4 = 4;
+
+ private String _fromDir;
+ private String _toDir;
+ private String _toDirTwice;
+
+ @Override
+ public void setUp() throws Exception
+ {
+ assertNotNull("QPID_WORK must be set", QPID_WORK_ORIG);
+ assertNotNull("QPID_HOME must be set", QPID_HOME);
+
+ if(! isExternalBroker())
+ {
+ //override QPID_WORK to add the InVM port used so the store
+ //output from the upgrade tool can be found by the broker
+ setSystemProperty("QPID_WORK", QPID_WORK_ORIG + "/" + getPort());
+ }
+
+ _fromDir = QPID_HOME + "/bdbstore-to-upgrade/test-store";
+ _toDir = getWorkDirBaseDir() + "/bdbstore/test-store";
+ _toDirTwice = getWorkDirBaseDir() + "/bdbstore-upgraded-twice";
+
+ //Clear the two target directories if they exist.
+ File directory = new File(_toDir);
+ if (directory.exists() && directory.isDirectory())
+ {
+ FileUtils.delete(directory, true);
+ }
+ directory = new File(_toDirTwice);
+ if (directory.exists() && directory.isDirectory())
+ {
+ FileUtils.delete(directory, true);
+ }
+
+ //Upgrade the test store.
+ upgradeBrokerStore(_fromDir, _toDir);
+
+ //override the broker config used and then start the broker with the updated store
+ _configFile = new File(QPID_HOME, "etc/config-systests-bdb.xml");
+ setConfigurationProperty("management.enabled", "true");
+
+ super.setUp();
+ }
+
+ private String getWorkDirBaseDir()
+ {
+ return QPID_WORK_ORIG + (isInternalBroker() ? "" : "/" + getPort());
+ }
+
+ /**
+ * Tests that the core upgrade method of the store upgrade tool passes through the exception
+ * from the BDBMessageStore indicating that the data on disk can't be loaded as the previous
+ * version because it has already been upgraded.
+ * @throws Exception
+ */
+ public void testMultipleUpgrades() throws Exception
+ {
+ //stop the broker started by setUp() in order to allow the second upgrade attempt to proceed
+ stopBroker();
+
+ try
+ {
+ new BDBStoreUpgrade(_toDir, _toDirTwice, null, false, true).upgradeFromVersion(VERSION_4);
+ fail("Second Upgrade Succeeded");
+ }
+ catch (Exception e)
+ {
+ System.err.println("Showing stack trace, we are expecting an 'Unable to load BDBStore' error");
+ e.printStackTrace();
+ assertTrue("Incorrect Exception Thrown:" + e.getMessage(),
+ e.getMessage().contains("Unable to load BDBStore as version 4. Store on disk contains version 5 data"));
+ }
+ }
+
+ /**
+ * Test that the selector applied to the DurableSubscription was successfully
+ * transfered to the new store, and functions as expected with continued use
+ * by monitoring message count while sending new messages to the topic.
+ */
+ public void testSelectorDurability() throws Exception
+ {
+ JMXTestUtils jmxUtils = null;
+ try
+ {
+ jmxUtils = new JMXTestUtils(this, "guest", "guest");
+ jmxUtils.open();
+ }
+ catch (Exception e)
+ {
+ fail("Unable to establish JMX connection, test cannot proceed");
+ }
+
+ try
+ {
+ ManagedQueue dursubQueue = jmxUtils.getManagedQueue("clientid" + ":" + SUB_NAME);
+ assertEquals("DurableSubscription backing queue should have 1 message on it initially",
+ new Integer(1), dursubQueue.getMessageCount());
+
+ // Create a connection and start it
+ TopicConnection connection = (TopicConnection) getConnection();
+ connection.start();
+
+ // Send messages which don't match and do match the selector, checking message count
+ TopicSession pubSession = connection.createTopicSession(true, org.apache.qpid.jms.Session.SESSION_TRANSACTED);
+ Topic topic = pubSession.createTopic(TOPIC_NAME);
+ TopicPublisher publisher = pubSession.createPublisher(topic);
+
+ BDBStoreUpgradeTestPreparer.publishMessages(pubSession, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "false");
+ pubSession.commit();
+ assertEquals("DurableSubscription backing queue should still have 1 message on it",
+ new Integer(1), dursubQueue.getMessageCount());
+
+ BDBStoreUpgradeTestPreparer.publishMessages(pubSession, publisher, topic, DeliveryMode.PERSISTENT, 1*1024, 1, "true");
+ pubSession.commit();
+ assertEquals("DurableSubscription backing queue should now have 2 messages on it",
+ new Integer(2), dursubQueue.getMessageCount());
+
+ dursubQueue.clearQueue();
+ pubSession.close();
+ }
+ finally
+ {
+ jmxUtils.close();
+ }
+ }
+
+ /**
+ * Test that the backing queue for the durable subscription created was successfully
+ * detected and set as being exclusive during the upgrade process, and that the
+ * regular queue was not.
+ */
+ public void testQueueExclusivity() throws Exception
+ {
+ JMXTestUtils jmxUtils = null;
+ try
+ {
+ jmxUtils = new JMXTestUtils(this, "guest", "guest");
+ jmxUtils.open();
+ }
+ catch (Exception e)
+ {
+ fail("Unable to establish JMX connection, test cannot proceed");
+ }
+
+ try
+ {
+ ManagedQueue queue = jmxUtils.getManagedQueue(QUEUE_NAME);
+ assertFalse("Queue should not have been marked as Exclusive during upgrade", queue.isExclusive());
+
+ ManagedQueue dursubQueue = jmxUtils.getManagedQueue("clientid" + ":" + SUB_NAME);
+ assertTrue("DurableSubscription backing queue should have been marked as Exclusive during upgrade", dursubQueue.isExclusive());
+ }
+ finally
+ {
+ jmxUtils.close();
+ }
+ }
+
+ /**
+ * Test that the upgraded queue continues to function properly when used
+ * for persistent messaging and restarting the broker.
+ *
+ * Sends the new messages to the queue BEFORE consuming those which were
+ * sent before the upgrade. In doing so, this also serves to test that
+ * the queue bindings were successfully transitioned during the upgrade.
+ */
+ public void testBindingAndMessageDurabability() throws Exception
+ {
+ // Create a connection and start it
+ TopicConnection connection = (TopicConnection) getConnection();
+ connection.start();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(QUEUE_NAME);
+ MessageProducer messageProducer = session.createProducer(queue);
+
+ // Send a new message
+ BDBStoreUpgradeTestPreparer.sendMessages(session, messageProducer, queue, DeliveryMode.PERSISTENT, 256*1024, 1);
+
+ session.close();
+
+ // Restart the broker
+ restartBroker();
+
+ // Drain the queue of all messages
+ connection = (TopicConnection) getConnection();
+ connection.start();
+ consumeQueueMessages(connection, true);
+ }
+
+ /**
+ * Test that all of the committed persistent messages previously sent to
+ * the broker are properly received following update of the MetaData and
+ * Content entries during the store upgrade process.
+ */
+ public void testConsumptionOfUpgradedMessages() throws Exception
+ {
+ // Create a connection and start it
+ Connection connection = getConnection();
+ connection.start();
+
+ consumeDurableSubscriptionMessages(connection);
+ consumeQueueMessages(connection, false);
+ }
+
+ /**
+ * Tests store migration containing messages for non-existing queue.
+ *
+ * @throws Exception
+ */
+ public void testMigrationOfMessagesForNonExistingQueues() throws Exception
+ {
+ stopBroker();
+
+ // copy store data into a new location for adding of phantom message
+ File storeLocation = new File(_fromDir);
+ File target = new File(_toDirTwice);
+ if (!target.exists())
+ {
+ target.mkdirs();
+ }
+ FileUtils.copyRecursive(storeLocation, target);
+
+ // delete migrated data
+ File directory = new File(_toDir);
+ if (directory.exists() && directory.isDirectory())
+ {
+ FileUtils.delete(directory, true);
+ }
+
+ // test data
+ String nonExistingQueueName = getTestQueueName();
+ String messageText = "Test Phantom Message";
+
+ // add message
+ addMessageForNonExistingQueue(target, VERSION_4, nonExistingQueueName, messageText);
+
+ String[] inputs = { "Yes", "Yes", "Yes" };
+ upgradeBrokerStoreInInterractiveMode(_toDirTwice, _toDir, inputs);
+
+ // start broker
+ startBroker();
+
+ // Create a connection and start it
+ Connection connection = getConnection();
+ connection.start();
+
+ // consume a message for non-existing store
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(nonExistingQueueName);
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+ Message message = messageConsumer.receive(1000);
+
+ // assert consumed message
+ assertNotNull("Message was not migrated!", message);
+ assertTrue("Unexpected message received!", message instanceof TextMessage);
+ String text = ((TextMessage) message).getText();
+ assertEquals("Message migration failed!", messageText, text);
+ }
+
+ /**
+ * An utility method to upgrade broker with simulation user interactions
+ *
+ * @param fromDir
+ * location of the store to migrate
+ * @param toDir
+ * location of where migrated data will be stored
+ * @param inputs
+ * user answers on upgrade tool questions
+ * @throws Exception
+ */
+ private void upgradeBrokerStoreInInterractiveMode(String fromDir, String toDir, final String[] inputs)
+ throws Exception
+ {
+ // save to restore system.in after data migration
+ InputStream stdin = System.in;
+
+ // set fake system in to simulate user interactions
+ // FIXME: it is a quite dirty simulator of system input but it does the job
+ System.setIn(new InputStream()
+ {
+
+ int counter = 0;
+
+ public synchronized int read(byte b[], int off, int len)
+ {
+ byte[] src = (inputs[counter] + "\n").getBytes();
+ System.arraycopy(src, 0, b, off, src.length);
+ counter++;
+ return src.length;
+ }
+
+ @Override
+ public int read() throws IOException
+ {
+ return -1;
+ }
+ });
+
+ try
+ {
+ // Upgrade the test store.
+ new BDBStoreUpgrade(fromDir, toDir, null, true, true).upgradeFromVersion(VERSION_4);
+ }
+ finally
+ {
+ // restore system in
+ System.setIn(stdin);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void addMessageForNonExistingQueue(File storeLocation, int storeVersion, String nonExistingQueueName,
+ String messageText) throws Exception
+ {
+ final AMQShortString queueName = new AMQShortString(nonExistingQueueName);
+ BDBMessageStore store = new BDBMessageStore(storeVersion);
+ store.configure(storeLocation, false);
+ try
+ {
+ store.start();
+
+ // store message objects
+ ByteBuffer completeContentBody = ByteBuffer.wrap(messageText.getBytes("UTF-8"));
+ long bodySize = completeContentBody.limit();
+ MessagePublishInfo pubInfoBody = new MessagePublishInfoImpl(new AMQShortString("amq.direct"), false,
+ false, queueName);
+ BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+ props.setDeliveryMode(Integer.valueOf(BasicContentHeaderProperties.PERSISTENT).byteValue());
+ props.setContentType("text/plain");
+ props.setType("text/plain");
+ props.setMessageId("whatever");
+ props.setEncoding("UTF-8");
+ props.getHeaders().setString("Test", "MST");
+ MethodRegistry methodRegistry = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
+ int classForBasic = methodRegistry.createBasicQosOkBody().getClazz();
+ ContentHeaderBody contentHeaderBody = new ContentHeaderBody(classForBasic, 1, props, bodySize);
+
+ // add content entry to database
+ long messageId = store.getNewMessageId();
+ TupleBinding<MessageContentKey> contentKeyTB = new MessageContentKeyTupleBindingFactory(storeVersion).getInstance();
+ MessageContentKey contentKey = null;
+ if (storeVersion == VERSION_4)
+ {
+ contentKey = new MessageContentKey_4(messageId, 0);
+ }
+ else
+ {
+ throw new Exception(storeVersion + " is not supported");
+ }
+ DatabaseEntry key = new DatabaseEntry();
+ contentKeyTB.objectToEntry(contentKey, key);
+ DatabaseEntry data = new DatabaseEntry();
+ ContentTB contentTB = new ContentTB();
+ contentTB.objectToEntry(completeContentBody, data);
+ store.getContentDb().put(null, key, data);
+
+ // add meta data entry to database
+ TupleBinding<Long> longTB = TupleBinding.getPrimitiveBinding(Long.class);
+ TupleBinding<Object> metaDataTB = new MessageMetaDataTupleBindingFactory(storeVersion).getInstance();
+ key = new DatabaseEntry();
+ data = new DatabaseEntry();
+ longTB.objectToEntry(new Long(messageId), key);
+ MessageMetaData metaData = new MessageMetaData(pubInfoBody, contentHeaderBody, 1);
+ metaDataTB.objectToEntry(metaData, data);
+ store.getMetaDataDb().put(null, key, data);
+
+ // add delivery entry to database
+ TransactionLogResource mockQueue = new TransactionLogResource()
+ {
+ public String getResourceName()
+ {
+ return queueName.asString();
+ }
+ };
+ TransactionLog log = (TransactionLog) store;
+ TransactionLog.Transaction txn = log.newTransaction();
+ txn.enqueueMessage(mockQueue, messageId);
+ txn.commitTran();
+ }
+ finally
+ {
+ // close store
+ store.close();
+ }
+ }
+
+ private void consumeDurableSubscriptionMessages(Connection connection) throws Exception
+ {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Topic topic = session.createTopic(TOPIC_NAME);
+
+ TopicSubscriber durSub = session.createDurableSubscriber(topic, SUB_NAME,"testprop='true'", false);
+
+ // Retrieve the matching message
+ Message m = durSub.receive(2000);
+ assertNotNull("Failed to receive an expected message", m);
+ assertEquals("Selector property did not match", "true", m.getStringProperty("testprop"));
+ assertEquals("ID property did not match", 1, m.getIntProperty("ID"));
+ assertEquals("Message content was not as expected",BDBStoreUpgradeTestPreparer.generateString(1024) , ((TextMessage)m).getText());
+
+ // Verify that neither the non-matching or uncommitted message are received
+ m = durSub.receive(1000);
+ assertNull("No more messages should have been recieved", m);
+
+ durSub.close();
+ session.close();
+ }
+
+ private void consumeQueueMessages(Connection connection, boolean extraMessage) throws Exception
+ {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(QUEUE_NAME);
+
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message m;
+
+ // Retrieve the initial pre-upgrade messages
+ for (int i=1; i <= 5 ; i++)
+ {
+ m = consumer.receive(2000);
+ assertNotNull("Failed to receive an expected message", m);
+ assertEquals("ID property did not match", i, m.getIntProperty("ID"));
+ assertEquals("Message content was not as expected", STRING_1024_256, ((TextMessage)m).getText());
+ }
+ for (int i=1; i <= 5 ; i++)
+ {
+ m = consumer.receive(2000);
+ assertNotNull("Failed to receive an expected message", m);
+ assertEquals("ID property did not match", i, m.getIntProperty("ID"));
+ assertEquals("Message content was not as expected", STRING_1024, ((TextMessage)m).getText());
+ }
+
+ if(extraMessage)
+ {
+ //verify that the extra message is received
+ m = consumer.receive(2000);
+ assertNotNull("Failed to receive an expected message", m);
+ assertEquals("ID property did not match", 1, m.getIntProperty("ID"));
+ assertEquals("Message content was not as expected", STRING_1024_256, ((TextMessage)m).getText());
+ }
+
+ // Verify that no more messages are received
+ m = consumer.receive(1000);
+ assertNull("No more messages should have been recieved", m);
+
+ consumer.close();
+ session.close();
+ }
+
+ private void upgradeBrokerStore(String fromDir, String toDir) throws Exception
+ {
+ new BDBStoreUpgrade(_fromDir, _toDir, null, false, true).upgradeFromVersion(VERSION_4);
+ }
+}
Added: qpid/trunk/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-to-upgrade/test-store/00000000.jdb
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-to-upgrade/test-store/00000000.jdb?rev=1175235&view=auto
==============================================================================
Files qpid/trunk/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-to-upgrade/test-store/00000000.jdb (added) and qpid/trunk/qpid/java/bdbstore/src/test/resources/upgrade/bdbstore-to-upgrade/test-store/00000000.jdb Sat Sep 24 20:16:00 2011 differ
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org