You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/04/19 19:56:23 UTC
svn commit: r530500 - in /incubator/qpid/trunk/qpid/cpp/src: ./ qpid/broker/
tests/
Author: gsim
Date: Thu Apr 19 10:56:21 2007
New Revision: 530500
URL: http://svn.apache.org/viewvc?view=rev&rev=530500
Log:
Some dtx related updates.
Added:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h (with props)
Modified:
incubator/qpid/trunk/qpid/cpp/src/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h
incubator/qpid/trunk/qpid/cpp/src/tests/TxBufferTest.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?view=diff&rev=530500&r1=530499&r2=530500
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Thu Apr 19 10:56:21 2007
@@ -112,6 +112,10 @@
qpid/broker/DeliverableMessage.cpp \
qpid/broker/DeliveryRecord.cpp \
qpid/broker/DirectExchange.cpp \
+ qpid/broker/DtxBuffer.cpp \
+ qpid/broker/DtxHandlerImpl.cpp \
+ qpid/broker/DtxManager.cpp \
+ qpid/broker/DtxWorkRecord.cpp \
qpid/broker/ExchangeRegistry.cpp \
qpid/broker/FanOutExchange.cpp \
qpid/broker/HeadersExchange.cpp \
@@ -161,6 +165,10 @@
qpid/broker/Deliverable.h \
qpid/broker/DeliverableMessage.h \
qpid/broker/DirectExchange.h \
+ qpid/broker/DtxBuffer.h \
+ qpid/broker/DtxHandlerImpl.h \
+ qpid/broker/DtxManager.h \
+ qpid/broker/DtxWorkRecord.h \
qpid/broker/ExchangeRegistry.h \
qpid/broker/FanOutExchange.h \
qpid/broker/HandlerImpl.h \
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?view=diff&rev=530500&r1=530499&r2=530500
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Thu Apr 19 10:56:21 2007
@@ -53,7 +53,8 @@
timeout(30000),
stagingThreshold(0),
cleaner(&queues, timeout/10),
- factory(*this)
+ factory(*this),
+ dtxManager(store.get())
{
exchanges.declare(empty, DirectExchange::typeName); // Default exchange.
exchanges.declare(amq_direct, DirectExchange::typeName);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?view=diff&rev=530500&r1=530499&r2=530500
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Thu Apr 19 10:56:21 2007
@@ -32,6 +32,7 @@
#include "ExchangeRegistry.h"
#include "ConnectionToken.h"
#include "DirectExchange.h"
+#include "DtxManager.h"
#include "qpid/framing/OutputHandler.h"
#include "qpid/framing/ProtocolInitiation.h"
#include "QueueRegistry.h"
@@ -83,6 +84,7 @@
uint32_t getTimeout() { return timeout; }
uint64_t getStagingThreshold() { return stagingThreshold; }
AutoDelete& getCleaner() { return cleaner; }
+ DtxManager& getDtxManager() { return dtxManager; }
private:
Broker(const Configuration& config);
@@ -97,6 +99,7 @@
uint64_t stagingThreshold;
AutoDelete cleaner;
ConnectionFactory factory;
+ DtxManager dtxManager;
static MessageStore* createStore(const Configuration& config);
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp?view=diff&rev=530500&r1=530499&r2=530500
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.cpp Thu Apr 19 10:56:21 2007
@@ -42,7 +42,8 @@
exchangeHandler(*this),
messageHandler(*this),
queueHandler(*this),
- txHandler(*this)
+ txHandler(*this),
+ dtxHandler(*this)
{}
@@ -336,7 +337,7 @@
}
void BrokerAdapter::TxHandlerImpl::select(const MethodContext& context){
- channel.begin();
+ channel.startTx();
client.selectOk(context.getRequestId());
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h?view=diff&rev=530500&r1=530499&r2=530500
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerAdapter.h Thu Apr 19 10:56:21 2007
@@ -21,6 +21,7 @@
#include "qpid/framing/AMQP_ServerOperations.h"
#include "HandlerImpl.h"
#include "MessageHandlerImpl.h"
+#include "DtxHandlerImpl.h"
#include "qpid/Exception.h"
namespace qpid {
@@ -76,6 +77,9 @@
TunnelHandler* getTunnelHandler() {
throw ConnectionException(540, "Tunnel class not implemented"); }
+ DtxCoordinationHandler* getDtxCoordinationHandler() { return &dtxHandler; }
+ DtxDemarcationHandler* getDtxDemarcationHandler() { return &dtxHandler; }
+
framing::AMQP_ClientProxy& getProxy() { return proxy; }
private:
@@ -213,7 +217,7 @@
MessageHandlerImpl messageHandler;
QueueHandlerImpl queueHandler;
TxHandlerImpl txHandler;
-
+ DtxHandlerImpl dtxHandler;
};
}} // namespace qpid::broker
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp?view=diff&rev=530500&r1=530499&r2=530500
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp Thu Apr 19 10:56:21 2007
@@ -54,7 +54,6 @@
ChannelAdapter(id, &con.getOutput(), con.getVersion()),
connection(con),
currentDeliveryTag(1),
- transactional(false),
prefetchSize(0),
prefetchCount(0),
framesize(_framesize),
@@ -104,24 +103,38 @@
recover(true);
}
-void Channel::begin(){
- transactional = true;
+void Channel::startTx(){
+ txBuffer = TxBuffer::shared_ptr(new TxBuffer());
}
void Channel::commit(){
TxOp::shared_ptr txAck(new TxAck(accumulatedAck, unacked));
- txBuffer.enlist(txAck);
- if(txBuffer.prepare(store)){
- txBuffer.commit();
+ txBuffer->enlist(txAck);
+ if (txBuffer->commitLocal(store)) {
+ accumulatedAck.clear();
}
- accumulatedAck.clear();
}
void Channel::rollback(){
- txBuffer.rollback();
+ txBuffer->rollback();
accumulatedAck.clear();
}
+void Channel::startDtx(const std::string& xid, DtxManager& mgr){
+ dtxBuffer = DtxBuffer::shared_ptr(new DtxBuffer());
+ txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer);
+ mgr.start(xid, dtxBuffer);
+}
+
+void Channel::endDtx(){
+ TxOp::shared_ptr txAck(new TxAck(accumulatedAck, unacked));
+ dtxBuffer->enlist(txAck);
+ dtxBuffer->markEnded();
+
+ dtxBuffer.reset();
+ txBuffer.reset();
+}
+
void Channel::deliver(
Message::shared_ptr& msg, const string& consumerTag,
Queue::shared_ptr& queue, bool ackExpected)
@@ -180,23 +193,8 @@
queue->dispatch();
}
-void Channel::handleInlineTransfer(Message::shared_ptr msg)
-{
- Exchange::shared_ptr exchange =
- connection.broker.getExchanges().get(msg->getExchange());
- if(transactional){
- TxPublish* deliverable(new TxPublish(msg));
- TxOp::shared_ptr op(deliverable);
- exchange->route(
- *deliverable, msg->getRoutingKey(),
- &(msg->getApplicationHeaders()));
- txBuffer.enlist(op);
- }else{
- DeliverableMessage deliverable(msg);
- exchange->route(
- deliverable, msg->getRoutingKey(),
- &(msg->getApplicationHeaders()));
- }
+void Channel::handleInlineTransfer(Message::shared_ptr msg){
+ complete(msg);
}
void Channel::handlePublish(Message* _message){
@@ -222,12 +220,12 @@
Exchange::shared_ptr exchange =
connection.broker.getExchanges().get(msg->getExchange());
assert(exchange.get());
- if(transactional) {
+ if (txBuffer) {
TxPublish* deliverable(new TxPublish(msg));
TxOp::shared_ptr op(deliverable);
exchange->route(*deliverable, msg->getRoutingKey(),
&(msg->getApplicationHeaders()));
- txBuffer.enlist(op);
+ txBuffer->enlist(op);
} else {
DeliverableMessage deliverable(msg);
exchange->route(deliverable, msg->getRoutingKey(),
@@ -236,24 +234,24 @@
}
void Channel::ack(){
- ack(getFirstAckRequest(), getLastAckRequest());
+ ack(getFirstAckRequest(), getLastAckRequest());
}
// Used by Basic
void Channel::ack(uint64_t deliveryTag, bool multiple){
- if (multiple)
- ack(0, deliveryTag);
- else
- ack(deliveryTag, deliveryTag);
+ if (multiple)
+ ack(0, deliveryTag);
+ else
+ ack(deliveryTag, deliveryTag);
}
void Channel::ack(uint64_t firstTag, uint64_t lastTag){
- if(transactional){
+ if (txBuffer) {
accumulatedAck.update(firstTag, lastTag);
//TODO: I think the outstanding prefetch size & count should be updated at this point...
//TODO: ...this may then necessitate dispatching to consumers
- }else{
+ } else {
Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), lastTag));
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h?view=diff&rev=530500&r1=530499&r2=530500
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h Thu Apr 19 10:56:21 2007
@@ -31,6 +31,8 @@
#include "AccumulatedAck.h"
#include "Consumer.h"
#include "DeliveryRecord.h"
+#include "DtxBuffer.h"
+#include "DtxManager.h"
#include "MessageBuilder.h"
#include "NameGenerator.h"
#include "Prefetch.h"
@@ -80,7 +82,6 @@
Connection& connection;
uint64_t currentDeliveryTag;
Queue::shared_ptr defaultQueue;
- bool transactional;
ConsumerImplMap consumers;
uint32_t prefetchSize;
uint16_t prefetchCount;
@@ -89,7 +90,8 @@
NameGenerator tagGenerator;
std::list<DeliveryRecord> unacked;
sys::Mutex deliveryLock;
- TxBuffer txBuffer;
+ TxBuffer::shared_ptr txBuffer;
+ DtxBuffer::shared_ptr dtxBuffer;
AccumulatedAck accumulatedAck;
MessageStore* const store;
MessageBuilder messageBuilder;//builder for in-progress message
@@ -113,7 +115,7 @@
~Channel();
bool isOpen() const { return opened; }
- BrokerAdapter& getAdatper() { return *adapter; }
+ BrokerAdapter& getAdapter() { return *adapter; }
void open() { opened = true; }
void setDefaultQueue(Queue::shared_ptr queue){ defaultQueue = queue; }
@@ -131,10 +133,12 @@
const framing::FieldTable* = 0);
void cancel(const string& tag);
bool get(Queue::shared_ptr queue, const std::string& destination, bool ackExpected);
- void begin();
void close();
+ void startTx();
void commit();
void rollback();
+ void startDtx(const std::string& xid, DtxManager& mgr);
+ void endDtx();
void ack();
void ack(uint64_t deliveryTag, bool multiple);
void ack(uint64_t deliveryTag, uint64_t endTag);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp?view=diff&rev=530500&r1=530499&r2=530500
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Connection.cpp Thu Apr 19 10:56:21 2007
@@ -82,7 +82,7 @@
string mechanisms("PLAIN");
string locales("en_US");
getChannel(0).init(0, *out, getVersion());
- client = &getChannel(0).getAdatper().getProxy().getConnection();
+ client = &getChannel(0).getAdapter().getProxy().getConnection();
client->start(
header.getMajor(), header.getMinor(),
properties, mechanisms, locales);
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp?view=auto&rev=530500
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp Thu Apr 19 10:56:21 2007
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "DtxBuffer.h"
+
+using namespace qpid::broker;
+using qpid::sys::Mutex;
+
+DtxBuffer::DtxBuffer() : ended(false) {}
+
+DtxBuffer::~DtxBuffer() {}
+
+void DtxBuffer::markEnded()
+{
+ Mutex::ScopedLock locker(lock);
+ ended = true;
+}
+
+bool DtxBuffer::isEnded()
+{
+ Mutex::ScopedLock locker(lock);
+ return ended;
+}
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h?view=auto&rev=530500
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h Thu Apr 19 10:56:21 2007
@@ -0,0 +1,44 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _DtxBuffer_
+#define _DtxBuffer_
+
+#include "TxBuffer.h"
+#include "qpid/sys/Mutex.h"
+
+namespace qpid {
+ namespace broker {
+ class DtxBuffer : public TxBuffer{
+ sys::Mutex lock;
+ bool ended;
+ public:
+ typedef boost::shared_ptr<DtxBuffer> shared_ptr;
+
+ DtxBuffer();
+ ~DtxBuffer();
+ void markEnded();
+ bool isEnded();
+ };
+ }
+}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp?view=auto&rev=530500
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp Thu Apr 19 10:56:21 2007
@@ -0,0 +1,124 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+#include "DtxHandlerImpl.h"
+
+#include "Broker.h"
+#include "BrokerChannel.h"
+
+using namespace qpid::broker;
+using qpid::framing::FieldTable;
+using qpid::framing::MethodContext;
+using std::string;
+
+DtxHandlerImpl::DtxHandlerImpl(CoreRefs& parent) : CoreRefs(parent) {}
+
+
+// DtxDemarcationHandler:
+
+void DtxHandlerImpl::end(const MethodContext& /*context*/,
+ u_int16_t /*ticket*/,
+ const string& /*xid*/,
+ bool /*fail*/,
+ bool /*suspend*/ )
+{
+ channel.endDtx();
+ //send end-ok
+ //TODO: handle fail and suspend
+ //TODO: check xid is as expected?
+}
+
+
+void DtxHandlerImpl::select(const MethodContext& /*context*/ )
+{
+ //don't need to do anything here really
+ //send select-ok
+}
+
+
+void DtxHandlerImpl::start(const MethodContext& /*context*/,
+ u_int16_t /*ticket*/,
+ const string& xid,
+ bool /*join*/,
+ bool /*resume*/ )
+{
+ channel.startDtx(xid, broker.getDtxManager());
+ //send start-ok
+ //TODO: handle join and resume
+}
+
+// DtxCoordinationHandler:
+
+void DtxHandlerImpl::prepare(const MethodContext& /*context*/,
+ u_int16_t /*ticket*/,
+ const string& xid )
+{
+ broker.getDtxManager().prepare(xid);
+ //send prepare-ok
+}
+
+void DtxHandlerImpl::commit(const MethodContext& /*context*/,
+ u_int16_t /*ticket*/,
+ const string& xid,
+ bool /*onePhase*/ )
+{
+ broker.getDtxManager().commit(xid);
+ //send commit-ok
+ //TODO use onePhase flag to validate correct sequence
+}
+
+
+void DtxHandlerImpl::rollback(const MethodContext& /*context*/,
+ u_int16_t /*ticket*/,
+ const string& xid )
+{
+ broker.getDtxManager().rollback(xid);
+ //send rollback-ok
+}
+
+void DtxHandlerImpl::recover(const MethodContext& /*context*/,
+ u_int16_t /*ticket*/,
+ bool /*startscan*/,
+ u_int32_t /*endscan*/ )
+{
+ //TODO
+}
+
+void DtxHandlerImpl::forget(const MethodContext& /*context*/,
+ u_int16_t /*ticket*/,
+ const string& /*xid*/ )
+{
+ //TODO
+}
+
+void DtxHandlerImpl::getTimeout(const MethodContext& /*context*/,
+ const string& /*xid*/ )
+{
+ //TODO
+}
+
+
+void DtxHandlerImpl::setTimeout(const MethodContext& /*context*/,
+ u_int16_t /*ticket*/,
+ const string& /*xid*/,
+ u_int32_t /*timeout*/ )
+{
+ //TODO
+}
+
+
+
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h?view=auto&rev=530500
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h Thu Apr 19 10:56:21 2007
@@ -0,0 +1,98 @@
+#ifndef _broker_DtxHandlerImpl_h
+#define _broker_DtxHandlerImpl_h
+
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/framing/AMQP_ClientProxy.h"
+#include "HandlerImpl.h"
+
+namespace qpid {
+namespace broker {
+
+/* dummy interfaces until real ones are generated from published spec */
+class DtxCoordinationHandler{};
+class DtxDemarcationHandler{};
+
+class DtxHandlerImpl
+ : public CoreRefs,
+ public /*framing::AMQP_ServerOperations::*/DtxCoordinationHandler,
+ public /*framing::AMQP_ServerOperations::*/DtxDemarcationHandler
+{
+public:
+ DtxHandlerImpl(CoreRefs& parent);
+
+ // DtxCoordinationHandler:
+
+ void commit(const framing::MethodContext& context,
+ u_int16_t ticket,
+ const std::string& xid,
+ bool onePhase );
+
+ void forget(const framing::MethodContext& context,
+ u_int16_t ticket,
+ const std::string& xid );
+
+ void getTimeout(const framing::MethodContext& context,
+ const std::string& xid );
+
+ void prepare(const framing::MethodContext& context,
+ u_int16_t ticket,
+ const std::string& xid );
+
+ void recover(const framing::MethodContext& context,
+ u_int16_t ticket,
+ bool startscan,
+ u_int32_t endscan );
+
+ void recoverOk(const framing::MethodContext& context,
+ const framing::FieldTable& xids );
+
+ void rollback(const framing::MethodContext& context,
+ u_int16_t ticket,
+ const std::string& xid );
+
+ void setTimeout(const framing::MethodContext& context,
+ u_int16_t ticket,
+ const std::string& xid,
+ u_int32_t timeout );
+
+ // DtxDemarcationHandler:
+
+ void end(const framing::MethodContext& context,
+ u_int16_t ticket,
+ const std::string& xid,
+ bool fail,
+ bool suspend );
+
+ void select(const framing::MethodContext& context );
+
+ void start(const framing::MethodContext& context,
+ u_int16_t ticket,
+ const std::string& xid,
+ bool join,
+ bool resume );
+};
+
+
+}} // namespace qpid::broker
+
+
+
+#endif /*!_broker_DtxHandlerImpl_h*/
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp?view=auto&rev=530500
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp Thu Apr 19 10:56:21 2007
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "DtxManager.h"
+#include <boost/format.hpp>
+
+using namespace qpid::broker;
+
+DtxManager::DtxManager(TransactionalStore* const _store) : store(_store) {}
+
+DtxManager::~DtxManager() {}
+
+void DtxManager::start(std::string xid, DtxBuffer::shared_ptr ops)
+{
+ WorkMap::iterator i = work.find(xid);
+ if (i == work.end()) {
+ i = work.insert(xid, new DtxWorkRecord(xid, store)).first;
+ }
+ i->add(ops);
+}
+
+void DtxManager::prepare(const std::string& xid)
+{
+ getWork(xid)->prepare();
+}
+
+void DtxManager::commit(const std::string& xid)
+{
+ getWork(xid)->commit();
+}
+
+void DtxManager::rollback(const std::string& xid)
+{
+ getWork(xid)->rollback();
+}
+
+DtxManager::WorkMap::iterator DtxManager::getWork(const std::string& xid)
+{
+ WorkMap::iterator i = work.find(xid);
+ if (i == work.end()) throw ConnectionException(503, boost::format("Unrecognised xid %1%!") % xid);
+ return i;
+}
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h?view=auto&rev=530500
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h Thu Apr 19 10:56:21 2007
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _DtxManager_
+#define _DtxManager_
+
+#include <boost/ptr_container/ptr_map.hpp>
+#include "DtxBuffer.h"
+#include "DtxWorkRecord.h"
+#include "TransactionalStore.h"
+#include "qpid/framing/amqp_types.h"
+
+namespace qpid {
+namespace broker {
+
+class DtxManager{
+ typedef boost::ptr_map<std::string, DtxWorkRecord> WorkMap;
+
+ WorkMap work;
+ TransactionalStore* const store;
+
+ WorkMap::iterator getWork(const std::string& xid);
+
+public:
+ DtxManager(TransactionalStore* const store);
+ ~DtxManager();
+ void start(std::string xid, DtxBuffer::shared_ptr work);
+ void prepare(const std::string& xid);
+ void commit(const std::string& xid);
+ void rollback(const std::string& xid);
+};
+
+}
+}
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp?view=auto&rev=530500
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp Thu Apr 19 10:56:21 2007
@@ -0,0 +1,107 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "DtxWorkRecord.h"
+#include <boost/format.hpp>
+#include <boost/mem_fn.hpp>
+using boost::mem_fn;
+
+using namespace qpid::broker;
+
+DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) : xid(_xid), store(_store) {}
+
+DtxWorkRecord::~DtxWorkRecord() {}
+
+bool DtxWorkRecord::prepare()
+{
+ checkCompletion();
+ txn = store->begin(xid);
+ if (prepare(txn.get())) {
+ store->prepare(*txn);
+ return true;
+ } else {
+ abort();
+ return false;
+ }
+}
+
+bool DtxWorkRecord::prepare(TransactionContext* _txn)
+{
+ bool succeeded(true);
+ for (Work::iterator i = work.begin(); succeeded && i != work.end(); i++) {
+ succeeded = (*i)->prepare(_txn);
+ }
+ return succeeded;
+}
+
+void DtxWorkRecord::commit()
+{
+ checkCompletion();
+ if (txn.get()) {
+ //already prepared
+ store->commit(*txn);
+ txn.reset();
+
+ for_each(work.begin(), work.end(), mem_fn(&TxBuffer::commit));
+ } else {
+ //1pc commit optimisation, don't need a 2pc transaction context:
+ std::auto_ptr<TransactionContext> localtxn = store->begin();
+ if (prepare(localtxn.get())) {
+ store->commit(*localtxn);
+ } else {
+ store->abort(*localtxn);
+ abort();
+ }
+ }
+}
+
+void DtxWorkRecord::rollback()
+{
+ checkCompletion();
+ abort();
+}
+
+void DtxWorkRecord::add(DtxBuffer::shared_ptr ops)
+{
+ work.push_back(ops);
+}
+
+void DtxWorkRecord::checkCompletion()
+{
+ if (!completed) {
+ //iterate through all DtxBuffers and ensure they are all ended
+ for (Work::iterator i = work.begin(); i != work.end(); i++) {
+ if (!(*i)->isEnded()) {
+ throw ConnectionException(503, boost::format("Branch with xid %1% not completed!") % xid);
+ }
+ }
+ completed = true;
+ }
+}
+
+void DtxWorkRecord::abort()
+{
+ if (txn.get()) {
+ store->abort(*txn);
+ txn.reset();
+ }
+ for_each(work.begin(), work.end(), mem_fn(&TxBuffer::rollback));
+
+}
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h?view=auto&rev=530500
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h Thu Apr 19 10:56:21 2007
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _DtxWorkRecord_
+#define _DtxWorkRecord_
+
+#include <algorithm>
+#include <functional>
+#include <vector>
+#include "DtxBuffer.h"
+#include "TransactionalStore.h"
+#include "qpid/framing/amqp_types.h"
+
+namespace qpid {
+namespace broker {
+
+class DtxWorkRecord
+{
+ typedef std::vector<DtxBuffer::shared_ptr> Work;
+
+ const std::string xid;
+ TransactionalStore* const store;
+ bool completed;
+ Work work;
+ std::auto_ptr<TPCTransactionContext> txn;
+
+ void checkCompletion();
+ void abort();
+ bool prepare(TransactionContext* txn);
+public:
+ DtxWorkRecord(const std::string& xid, TransactionalStore* const store);
+ ~DtxWorkRecord();
+ bool prepare();
+ void commit();
+ void rollback();
+ void add(DtxBuffer::shared_ptr ops);
+};
+
+}
+}
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp?view=diff&rev=530500&r1=530499&r2=530500
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp Thu Apr 19 10:56:21 2007
@@ -24,17 +24,13 @@
using boost::mem_fn;
using namespace qpid::broker;
-bool TxBuffer::prepare(TransactionalStore* const store)
+bool TxBuffer::prepare(TransactionContext* const ctxt)
{
- std::auto_ptr<TransactionContext> ctxt;
- if(store) ctxt = store->begin();
for(op_iterator i = ops.begin(); i < ops.end(); i++){
- if(!(*i)->prepare(ctxt.get())){
- if(store) store->abort(*ctxt);
+ if(!(*i)->prepare(ctxt)){
return false;
}
}
- if(store) store->commit(*ctxt);
return true;
}
@@ -53,4 +49,19 @@
void TxBuffer::enlist(TxOp::shared_ptr op)
{
ops.push_back(op);
+}
+
+bool TxBuffer::commitLocal(TransactionalStore* const store)
+{
+ std::auto_ptr<TransactionContext> ctxt;
+ if(store) ctxt = store->begin();
+ if (prepare(ctxt.get())) {
+ if(store) store->commit(*ctxt);
+ commit();
+ return true;
+ } else {
+ if(store) store->abort(*ctxt);
+ rollback();
+ return false;
+ }
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h?view=diff&rev=530500&r1=530499&r2=530500
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h Thu Apr 19 10:56:21 2007
@@ -61,44 +61,52 @@
class TxBuffer{
typedef std::vector<TxOp::shared_ptr>::iterator op_iterator;
std::vector<TxOp::shared_ptr> ops;
+ protected:
+
public:
+ typedef boost::shared_ptr<TxBuffer> shared_ptr;
+ /**
+ * Adds an operation to the transaction.
+ */
+ void enlist(TxOp::shared_ptr op);
+
/**
* Requests that all ops are prepared. This should
* primarily involve making sure that a persistent record
* of the operations is stored where necessary.
- *
- * All ops will be prepared under a transaction on the
- * specified store. If any operation fails on prepare,
- * this transaction will be rolled back.
- *
+ *
* Once prepared, a transaction can be committed (or in
* the 2pc case, rolled back).
*
* @returns true if all the operations prepared
* successfully, false if not.
*/
- bool prepare(TransactionalStore* const store);
+ bool prepare(TransactionContext* const ctxt);
+
/**
- * Signals that the ops all prepared all completed
- * successfully and can now commit, i.e. the operation can
- * now be fully carried out.
+ * Signals that the ops all prepared successfully and can
+ * now commit, i.e. the operation can now be fully carried
+ * out.
*
* Should only be called after a call to prepare() returns
* true.
*/
void commit();
+
/**
- * Rolls back all the operations.
+ * Signals that all ops can be rolled back.
*
* Should only be called either after a call to prepare()
* returns true (2pc) or instead of a prepare call
* ('server-local')
*/
void rollback();
+
/**
- * Adds an operation to the transaction.
+ * Helper method for managing the process of server local
+ * commit
*/
- void enlist(TxOp::shared_ptr op);
+ bool commitLocal(TransactionalStore* const store);
};
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/TxBufferTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/TxBufferTest.cpp?view=diff&rev=530500&r1=530499&r2=530500
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/TxBufferTest.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/TxBufferTest.cpp Thu Apr 19 10:56:21 2007
@@ -161,7 +161,9 @@
};
CPPUNIT_TEST_SUITE(TxBufferTest);
- CPPUNIT_TEST(testPrepareAndCommit);
+ CPPUNIT_TEST(testCommitLocal);
+ CPPUNIT_TEST(testFailOnCommitLocal);
+ CPPUNIT_TEST(testPrepare);
CPPUNIT_TEST(testFailOnPrepare);
CPPUNIT_TEST(testRollback);
CPPUNIT_TEST(testBufferIsClearedAfterRollback);
@@ -170,14 +172,14 @@
public:
- void testPrepareAndCommit(){
+ void testCommitLocal(){
MockTransactionalStore store;
store.expectBegin().expectCommit();
MockTxOp::shared_ptr opA(new MockTxOp());
opA->expectPrepare().expectCommit();
MockTxOp::shared_ptr opB(new MockTxOp());
- opB->expectPrepare().expectPrepare().expectCommit().expectCommit();//opB enlisted twice to test reative order
+ opB->expectPrepare().expectPrepare().expectCommit().expectCommit();//opB enlisted twice to test relative order
MockTxOp::shared_ptr opC(new MockTxOp());
opC->expectPrepare().expectCommit();
@@ -187,8 +189,7 @@
buffer.enlist(static_pointer_cast<TxOp>(opB));//opB enlisted twice
buffer.enlist(static_pointer_cast<TxOp>(opC));
- CPPUNIT_ASSERT(buffer.prepare(&store));
- buffer.commit();
+ CPPUNIT_ASSERT(buffer.commitLocal(&store));
store.check();
CPPUNIT_ASSERT(store.isCommitted());
opA->check();
@@ -196,11 +197,51 @@
opC->check();
}
- void testFailOnPrepare(){
+ void testFailOnCommitLocal(){
MockTransactionalStore store;
store.expectBegin().expectAbort();
MockTxOp::shared_ptr opA(new MockTxOp());
+ opA->expectPrepare().expectRollback();
+ MockTxOp::shared_ptr opB(new MockTxOp(true));
+ opB->expectPrepare().expectRollback();
+ MockTxOp::shared_ptr opC(new MockTxOp());//will never get prepare as b will fail
+ opC->expectRollback();
+
+ TxBuffer buffer;
+ buffer.enlist(static_pointer_cast<TxOp>(opA));
+ buffer.enlist(static_pointer_cast<TxOp>(opB));
+ buffer.enlist(static_pointer_cast<TxOp>(opC));
+
+ CPPUNIT_ASSERT(!buffer.commitLocal(&store));
+ CPPUNIT_ASSERT(store.isAborted());
+ store.check();
+ opA->check();
+ opB->check();
+ opC->check();
+ }
+
+ void testPrepare(){
+ MockTxOp::shared_ptr opA(new MockTxOp());
+ opA->expectPrepare();
+ MockTxOp::shared_ptr opB(new MockTxOp());
+ opB->expectPrepare();
+ MockTxOp::shared_ptr opC(new MockTxOp());
+ opC->expectPrepare();
+
+ TxBuffer buffer;
+ buffer.enlist(static_pointer_cast<TxOp>(opA));
+ buffer.enlist(static_pointer_cast<TxOp>(opB));
+ buffer.enlist(static_pointer_cast<TxOp>(opC));
+
+ CPPUNIT_ASSERT(buffer.prepare(0));
+ opA->check();
+ opB->check();
+ opC->check();
+ }
+
+ void testFailOnPrepare(){
+ MockTxOp::shared_ptr opA(new MockTxOp());
opA->expectPrepare();
MockTxOp::shared_ptr opB(new MockTxOp(true));
opB->expectPrepare();
@@ -211,9 +252,7 @@
buffer.enlist(static_pointer_cast<TxOp>(opB));
buffer.enlist(static_pointer_cast<TxOp>(opC));
- CPPUNIT_ASSERT(!buffer.prepare(&store));
- store.check();
- CPPUNIT_ASSERT(store.isAborted());
+ CPPUNIT_ASSERT(!buffer.prepare(0));
opA->check();
opB->check();
opC->check();