You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2014/01/23 11:15:49 UTC
svn commit: r1560618 [4/5] - in /qpid/branches/java-broker-bdb-ha: ./ qpid/
qpid/bin/ qpid/cpp/
qpid/cpp/bindings/qpid/dotnet/examples/msvc10/csharp.direct.receiver/
qpid/cpp/bindings/qpid/dotnet/examples/msvc10/csharp.direct.sender/
qpid/cpp/bindings/...
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp Thu Jan 23 10:15:46 2014
@@ -49,6 +49,18 @@ namespace qpid {
namespace linearstore {
namespace journal {
+RecoveredRecordData_t::RecoveredRecordData_t(const uint64_t rid, const uint64_t fid, const std::streampos foffs, bool ptxn) :
+ recordId_(rid),
+ fileId_(fid),
+ fileOffset_(foffs),
+ pendingTransaction_(ptxn)
+{}
+
+
+bool recordIdListCompare(RecoveredRecordData_t a, RecoveredRecordData_t b) {
+ return a.recordId_ < b.recordId_;
+}
+
RecoveryManager::RecoveryManager(const std::string& journalDirectory,
const std::string& queuename,
enq_map& enqueueMapRef,
@@ -86,6 +98,9 @@ void RecoveryManager::analyzeJournals(co
if (!journalEmptyFlag_) {
// Read all records, establish remaining enqueued records
+ if (inFileStream_.is_open()) {
+ inFileStream_.close();
+ }
while (getNextRecordHeader()) {}
if (inFileStream_.is_open()) {
inFileStream_.close();
@@ -102,14 +117,13 @@ void RecoveryManager::analyzeJournals(co
std::vector<std::string>::const_iterator pitr =
std::find(preparedTransactionListPtr->begin(), preparedTransactionListPtr->end(), *itr);
if (pitr == preparedTransactionListPtr->end()) { // not found in prepared list
- txn_data_list tdl = transactionMapRef_.get_remove_tdata_list(*itr); // tdl will be empty if xid not found
+ txn_data_list_t tdl = transactionMapRef_.get_remove_tdata_list(*itr); // tdl will be empty if xid not found
// Unlock any affected enqueues in emap
- for (tdl_itr i=tdl.begin(); i<tdl.end(); i++) {
+ for (tdl_itr_t i=tdl.begin(); i<tdl.end(); i++) {
if (i->enq_flag_) { // enq op - decrement enqueue count
fileNumberMap_[i->pfid_]->decrEnqueuedRecordCount();
} else if (enqueueMapRef_.is_enqueued(i->drid_, true)) { // deq op - unlock enq record
- int16_t ret = enqueueMapRef_.unlock(i->drid_);
- if (ret < enq_map::EMAP_OK) { // fail
+ if (enqueueMapRef_.unlock(i->drid_) < enq_map::EMAP_OK) { // fail
// enq_map::unlock()'s only error is enq_map::EMAP_RID_NOT_FOUND
std::ostringstream oss;
oss << std::hex << "_emap.unlock(): drid=0x\"" << i->drid_;
@@ -120,11 +134,7 @@ void RecoveryManager::analyzeJournals(co
}
}
}
-
- // Set up recordIdList_ from enqueue map
- enqueueMapRef_.rid_list(recordIdList_);
-
- recordIdListConstItr_ = recordIdList_.begin();
+ prepareRecordList();
}
}
@@ -151,37 +161,44 @@ bool RecoveryManager::readNextRemainingR
bool& transient,
bool& external,
data_tok* const dtokp,
- bool /*ignore_pending_txns*/) {
- if (recordIdListConstItr_ == recordIdList_.end()) {
- return false;
- }
- enq_map::emap_data_struct_t eds;
- enqueueMapRef_.get_data(*recordIdListConstItr_, eds);
- if (!inFileStream_.is_open() || currentJournalFileConstItr_->first != eds._pfid) {
- getFile(eds._pfid, false);
- }
-//std::cout << " " << eds._pfid << std::hex << ",0x" << eds._file_posn << std::flush; // DEBUG
+ bool ignore_pending_txns) {
+ bool foundRecord = false;
+ do {
+ if (recordIdListConstItr_ == recordIdList_.end()) {
+ return false;
+ }
+ if (recordIdListConstItr_->pendingTransaction_ && ignore_pending_txns) { // Pending transaction
+ ++recordIdListConstItr_; // ignore, go to next record
+ } else {
+ foundRecord = true;
+ }
+ } while (!foundRecord);
- inFileStream_.seekg(eds._file_posn, std::ifstream::beg);
+ if (!inFileStream_.is_open() || currentJournalFileConstItr_->first != recordIdListConstItr_->fileId_) {
+ if (!getFile(recordIdListConstItr_->fileId_, false)) {
+ std::ostringstream oss;
+ oss << "Failed to open file with file-id=" << recordIdListConstItr_->fileId_;
+ throw jexception(jerrno::JERR__FILEIO, oss.str(), "RecoveryManager", "readNextRemainingRecord");
+ }
+ }
+ inFileStream_.seekg(recordIdListConstItr_->fileOffset_, std::ifstream::beg);
if (!inFileStream_.good()) {
std::ostringstream oss;
- oss << "Could not find offset 0x" << std::hex << eds._file_posn << " in file " << getCurrentFileName();
+ oss << "Could not find offset 0x" << std::hex << recordIdListConstItr_->fileOffset_ << " in file " << getCurrentFileName();
throw jexception(jerrno::JERR__FILEIO, oss.str(), "RecoveryManager", "readNextRemainingRecord");
}
+
::enq_hdr_t enqueueHeader;
inFileStream_.read((char*)&enqueueHeader, sizeof(::enq_hdr_t));
if (inFileStream_.gcount() != sizeof(::enq_hdr_t)) {
std::ostringstream oss;
- oss << "Could not read enqueue header from file " << getCurrentFileName() << " at offset 0x" << std::hex << eds._file_posn;
+ oss << "Could not read enqueue header from file " << getCurrentFileName() << " at offset 0x" << std::hex << recordIdListConstItr_->fileOffset_;
throw jexception(jerrno::JERR__FILEIO, oss.str(), "RecoveryManager", "readNextRemainingRecord");
}
// check flags
transient = ::is_enq_transient(&enqueueHeader);
external = ::is_enq_external(&enqueueHeader);
-//char magicBuff[5]; // DEBUG
-//::memcpy(magicBuff, &enqueueHeader, 4); // DEBUG
-//magicBuff[4] = 0; // DEBUG
-//std::cout << std::hex << ":" << (char*)magicBuff << ",rid=0x" << enqueueHeader._rhdr._rid << ",xs=0x" << enqueueHeader._xidsize << ",ds=0x" << enqueueHeader._dsize << std::dec << std::flush; // DEBUG
+
// read xid
xidSize = enqueueHeader._xidsize;
*xidPtrPtr = ::malloc(xidSize);
@@ -386,6 +403,12 @@ void RecoveryManager::checkFileStreamOk(
}
void RecoveryManager::checkJournalAlignment(const std::streampos recordPosition) {
+ if (recordPosition % QLS_DBLK_SIZE_BYTES != 0) {
+ std::ostringstream oss;
+ oss << "Current read pointer not dblk aligned: recordPosition=0x" << std::hex << recordPosition;
+ oss << " (dblk alignment offset = 0x" << (recordPosition % QLS_DBLK_SIZE_BYTES);
+ throw jexception(jerrno::JERR_RCVM_NOTDBLKALIGNED, oss.str(), "RecoveryManager", "checkJournalAlignment");
+ }
std::streampos currentPosn = recordPosition;
unsigned sblkOffset = currentPosn % QLS_SBLK_SIZE_BYTES;
if (sblkOffset)
@@ -433,7 +456,7 @@ void RecoveryManager::checkJournalAlignm
bool RecoveryManager::decodeRecord(jrec& record,
std::size_t& cumulativeSizeRead,
::rec_hdr_t& headerRecord,
- std::streampos& fileOffset)
+ std::streampos& fileOffset)
{
std::streampos start_file_offs = fileOffset;
@@ -455,7 +478,6 @@ bool RecoveryManager::decodeRecord(jrec&
}
if (!done && needNextFile()) {
if (!getNextFile(false)) {
- checkJournalAlignment(start_file_offs);
return false;
}
}
@@ -574,7 +596,7 @@ bool RecoveryManager::getNextRecordHeade
throw jexception(jerrno::JERR_RCVM_NULLXID, "ENQ", "RecoveryManager", "getNextRecordHeader");
}
std::string xid((char*)xidp, er.xid_size());
- transactionMapRef_.insert_txn_data(xid, txn_data_t(h._rid, 0, start_fid, file_pos, true));
+ transactionMapRef_.insert_txn_data(xid, txn_data_t(h._rid, 0, start_fid, file_pos, true, false, false));
if (transactionMapRef_.set_aio_compl(xid, h._rid) < txn_map::TMAP_OK) { // fail - xid or rid not found
std::ostringstream oss;
oss << std::hex << "_tmap.set_aio_compl: txn_enq xid=\"" << xid << "\" rid=0x" << h._rid;
@@ -613,7 +635,7 @@ bool RecoveryManager::getNextRecordHeade
}
std::string xid((char*)xidp, dr.xid_size());
transactionMapRef_.insert_txn_data(xid, txn_data_t(dr.rid(), dr.deq_rid(), start_fid, file_pos,
- false, dr.is_txn_coml_commit()));
+ false, false, dr.is_txn_coml_commit()));
if (transactionMapRef_.set_aio_compl(xid, dr.rid()) < txn_map::TMAP_OK) { // fail - xid or rid not found
std::ostringstream oss;
oss << std::hex << "_tmap.set_aio_compl: txn_deq xid=\"" << xid << "\" rid=0x" << dr.rid();
@@ -645,8 +667,8 @@ bool RecoveryManager::getNextRecordHeade
throw jexception(jerrno::JERR_RCVM_NULLXID, "ABT", "RecoveryManager", "getNextRecordHeader");
}
std::string xid((char*)xidp, ar.xid_size());
- txn_data_list tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found
- for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++) {
+ txn_data_list_t tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found
+ for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++) {
if (itr->enq_flag_) {
fileNumberMap_[itr->pfid_]->decrEnqueuedRecordCount();
} else {
@@ -673,8 +695,8 @@ bool RecoveryManager::getNextRecordHeade
throw jexception(jerrno::JERR_RCVM_NULLXID, "CMT", "RecoveryManager", "getNextRecordHeader");
}
std::string xid((char*)xidp, cr.xid_size());
- txn_data_list tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found
- for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++) {
+ txn_data_list_t tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found
+ for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++) {
if (itr->enq_flag_) { // txn enqueue
//std::cout << "[rid=0x" << std::hex << itr->rid_ << std::dec << " fid=" << itr->pfid_ << " fpos=0x" << std::hex << itr->foffs_ << "]" << std::dec << std::flush; // DEBUG
if (enqueueMapRef_.insert_pfid(itr->rid_, itr->pfid_, itr->foffs_) < enq_map::EMAP_OK) { // fail
@@ -725,6 +747,35 @@ bool RecoveryManager::needNextFile() {
return true;
}
+void RecoveryManager::prepareRecordList() {
+ // Set up recordIdList_ from enqueue map and transaction map
+ recordIdList_.clear();
+
+ // Extract records from enqueue list
+ std::vector<uint64_t> ridList;
+ enqueueMapRef_.rid_list(ridList);
+ qpid::linearstore::journal::enq_map::emap_data_struct_t eds;
+ for (std::vector<uint64_t>::const_iterator i=ridList.begin(); i!=ridList.end(); ++i) {
+ enqueueMapRef_.get_data(*i, eds);
+ recordIdList_.push_back(RecoveredRecordData_t(*i, eds._pfid, eds._file_posn, false));
+ }
+
+ // Extract records from pending transaction enqueues
+ std::vector<std::string> xidList;
+ transactionMapRef_.xid_list(xidList);
+ for (std::vector<std::string>::const_iterator j=xidList.begin(); j!=xidList.end(); ++j) {
+ qpid::linearstore::journal::txn_data_list_t tdsl = transactionMapRef_.get_tdata_list(*j);
+ for (qpid::linearstore::journal::tdl_itr_t k=tdsl.begin(); k!=tdsl.end(); ++k) {
+ if (k->enq_flag_) {
+ recordIdList_.push_back(RecoveredRecordData_t(k->rid_, k->pfid_, k->foffs_, true));
+ }
+ }
+ }
+
+ std::sort(recordIdList_.begin(), recordIdList_.end(), recordIdListCompare);
+ recordIdListConstItr_ = recordIdList_.begin();
+}
+
void RecoveryManager::readJournalData(char* target,
const std::streamsize readSize) {
std::streamoff bytesRead = 0;
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h Thu Jan 23 10:15:46 2014
@@ -22,7 +22,6 @@
#ifndef QPID_LINEARSTORE_JOURNAL_RECOVERYSTATE_H_
#define QPID_LINEARSTORE_JOURNAL_RECOVERYSTATE_H_
-#include <deque>
#include <fstream>
#include <map>
#include "qpid/linearstore/journal/LinearFileController.h"
@@ -44,6 +43,16 @@ class JournalLog;
class jrec;
class txn_map;
+struct RecoveredRecordData_t {
+ uint64_t recordId_;
+ uint64_t fileId_;
+ std::streampos fileOffset_;
+ bool pendingTransaction_;
+ RecoveredRecordData_t(const uint64_t rid, const uint64_t fid, const std::streampos foffs, bool ptxn);
+};
+
+bool recordIdListCompare(RecoveredRecordData_t a, RecoveredRecordData_t b);
+
class RecoveryManager
{
protected:
@@ -53,7 +62,7 @@ protected:
typedef std::map<uint64_t, JournalFile*> fileNumberMap_t;
typedef fileNumberMap_t::iterator fileNumberMapItr_t;
typedef fileNumberMap_t::const_iterator fileNumberMapConstItr_t;
- typedef std::vector<uint64_t> recordIdList_t;
+ typedef std::vector<RecoveredRecordData_t> recordIdList_t;
typedef recordIdList_t::const_iterator recordIdListConstItr_t;
// Location and identity
@@ -123,6 +132,7 @@ protected:
bool getNextFile(bool jumpToFirstRecordOffsetFlag);
bool getNextRecordHeader();
bool needNextFile();
+ void prepareRecordList();
bool readFileHeader();
void readJournalData(char* target, const std::streamsize size);
void removeEmptyFiles(EmptyFilePool* emptyFilePoolPtr);
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jcfg.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jcfg.h?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jcfg.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jcfg.h Thu Jan 23 10:15:46 2014
@@ -43,8 +43,8 @@
#define QLS_WMGR_MAXWAITUS 100 /**< Max. wait time (us) before submitting AIO */
#define QLS_JRNL_FILE_EXTENSION ".jrnl" /**< Extension for journal data files */
-#define QLS_TXA_MAGIC 0x61534c51 /**< ("RHMa" in little endian) Magic for dtx abort hdrs */
-#define QLS_TXC_MAGIC 0x63534c51 /**< ("RHMc" in little endian) Magic for dtx commit hdrs */
+#define QLS_TXA_MAGIC 0x61534c51 /**< ("QLSa" in little endian) Magic for dtx abort hdrs */
+#define QLS_TXC_MAGIC 0x63534c51 /**< ("QLSc" in little endian) Magic for dtx commit hdrs */
#define QLS_DEQ_MAGIC 0x64534c51 /**< ("QLSd" in little endian) Magic for deq rec hdrs */
#define QLS_ENQ_MAGIC 0x65534c51 /**< ("QLSe" in little endian) Magic for enq rec hdrs */
#define QLS_FILE_MAGIC 0x66534c51 /**< ("QLSf" in little endian) Magic for file hdrs */
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp Thu Jan 23 10:15:46 2014
@@ -155,7 +155,7 @@ jcntl::enqueue_data_record(const void* c
check_wstatus("enqueue_data_record");
{
slock s(_wr_mutex);
- while (handle_aio_wait(_wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, 0, 0, transient, false), r,
+ while (handle_aio_wait(_wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, 0, 0, false, transient, false), r,
dtokp)) ;
}
return r;
@@ -170,7 +170,7 @@ jcntl::enqueue_extern_data_record(const
check_wstatus("enqueue_extern_data_record");
{
slock s(_wr_mutex);
- while (handle_aio_wait(_wmgr.enqueue(0, tot_data_len, 0, dtokp, 0, 0, transient, true), r, dtokp)) ;
+ while (handle_aio_wait(_wmgr.enqueue(0, tot_data_len, 0, dtokp, 0, 0, false, transient, true), r, dtokp)) ;
}
return r;
}
@@ -181,6 +181,7 @@ jcntl::enqueue_txn_data_record(const voi
const std::size_t this_data_len,
data_tok* dtokp,
const std::string& xid,
+ const bool tpc_flag,
const bool transient)
{
iores r;
@@ -188,7 +189,7 @@ jcntl::enqueue_txn_data_record(const voi
{
slock s(_wr_mutex);
while (handle_aio_wait(_wmgr.enqueue(data_buff, tot_data_len, this_data_len, dtokp, xid.data(), xid.size(),
- transient, false), r, dtokp)) ;
+ tpc_flag, transient, false), r, dtokp)) ;
}
return r;
}
@@ -197,14 +198,15 @@ iores
jcntl::enqueue_extern_txn_data_record(const std::size_t tot_data_len,
data_tok* dtokp,
const std::string& xid,
+ const bool tpc_flag,
const bool transient)
{
iores r;
check_wstatus("enqueue_extern_txn_data_record");
{
slock s(_wr_mutex);
- while (handle_aio_wait(_wmgr.enqueue(0, tot_data_len, 0, dtokp, xid.data(), xid.size(), transient, true), r,
- dtokp)) ;
+ while (handle_aio_wait(_wmgr.enqueue(0, tot_data_len, 0, dtokp, xid.data(), xid.size(), tpc_flag, transient,
+ true), r, dtokp)) ;
}
return r;
}
@@ -234,7 +236,7 @@ jcntl::dequeue_data_record(data_tok* con
check_wstatus("dequeue_data");
{
slock s(_wr_mutex);
- while (handle_aio_wait(_wmgr.dequeue(dtokp, 0, 0, txn_coml_commit), r, dtokp)) ;
+ while (handle_aio_wait(_wmgr.dequeue(dtokp, 0, 0, false, txn_coml_commit), r, dtokp)) ;
}
return r;
}
@@ -242,13 +244,14 @@ jcntl::dequeue_data_record(data_tok* con
iores
jcntl::dequeue_txn_data_record(data_tok* const dtokp,
const std::string& xid,
+ const bool tpc_flag,
const bool txn_coml_commit)
{
iores r;
check_wstatus("dequeue_data");
{
slock s(_wr_mutex);
- while (handle_aio_wait(_wmgr.dequeue(dtokp, xid.data(), xid.size(), txn_coml_commit), r, dtokp)) ;
+ while (handle_aio_wait(_wmgr.dequeue(dtokp, xid.data(), xid.size(), tpc_flag, txn_coml_commit), r, dtokp)) ;
}
return r;
}
@@ -380,7 +383,7 @@ jcntl::handle_aio_wait(const iores res,
while (_wmgr.curr_pg_blocked())
{
if (_wmgr.get_aio_evt_rem() == 0) {
-std::cout << "&&&&&& jcntl::handle_aio_wait() " << _wmgr.status_str() << std::endl; // DEBUG
+//std::cout << "&&&&&& jcntl::handle_aio_wait() " << _wmgr.status_str() << std::endl; // DEBUG
throw jexception("_wmgr.curr_pg_blocked() with no events remaining"); // TODO - complete exception
}
if (_wmgr.get_events(&_aio_cmpl_timeout, false) == jerrno::AIO_TIMEOUT)
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jcntl.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jcntl.h?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jcntl.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jcntl.h Thu Jan 23 10:15:46 2014
@@ -270,11 +270,11 @@ public:
const std::size_t tot_data_len,
const std::size_t this_data_len,
data_tok* dtokp,
- const bool transient = false);
+ const bool transient);
iores enqueue_extern_data_record(const std::size_t tot_data_len,
data_tok* dtokp,
- const bool transient = false);
+ const bool transient);
/**
* \brief Enqueue data.
@@ -294,12 +294,14 @@ public:
const std::size_t this_data_len,
data_tok* dtokp,
const std::string& xid,
- const bool transient = false);
+ const bool tpc_flag,
+ const bool transient);
iores enqueue_extern_txn_data_record(const std::size_t tot_data_len,
data_tok* dtokp,
const std::string& xid,
- const bool transient = false);
+ const bool tpc_flag,
+ const bool transient);
/**
* \brief Reads data from the journal. It is the responsibility of the reader to free
@@ -350,7 +352,7 @@ public:
bool& transient,
bool& external,
data_tok* const dtokp,
- bool ignore_pending_txns = false);
+ bool ignore_pending_txns);
/**
* \brief Dequeues (marks as no longer needed) data record in journal.
@@ -370,7 +372,7 @@ public:
* \exception TODO
*/
iores dequeue_data_record(data_tok* const dtokp,
- const bool txn_coml_commit = false);
+ const bool txn_coml_commit);
/**
* \brief Dequeues (marks as no longer needed) data record in journal.
@@ -393,7 +395,8 @@ public:
*/
iores dequeue_txn_data_record(data_tok* const dtokp,
const std::string& xid,
- const bool txn_coml_commit = false);
+ const bool tpc_flag,
+ const bool txn_coml_commit);
/**
* \brief Abort the transaction for all records enqueued or dequeued with the matching xid.
@@ -458,12 +461,12 @@ public:
* \param block_till_aio_cmpl If true, will block the thread while waiting for all
* outstanding AIO operations to complete.
*/
- void stop(const bool block_till_aio_cmpl = false);
+ void stop(const bool block_till_aio_cmpl);
/**
* \brief Force a flush of the write page cache, creating a single AIO write operation.
*/
- iores flush(const bool block_till_aio_cmpl = false);
+ iores flush(const bool block_till_aio_cmpl);
inline uint32_t get_enq_cnt() const { return _emap.size(); } // TODO: _emap: Thread safe?
@@ -480,7 +483,7 @@ public:
* false if the rid is transactionally enqueued and is not committed, or if it is
* locked (i.e. transactionally dequeued, but the dequeue has not been committed).
*/
- inline bool is_enqueued(const uint64_t rid, bool ignore_lock = false) { return _emap.is_enqueued(rid, ignore_lock); }
+ inline bool is_enqueued(const uint64_t rid, bool ignore_lock) { return _emap.is_enqueued(rid, ignore_lock); }
inline bool is_locked(const uint64_t rid) {
if (_emap.is_enqueued(rid, true) < enq_map::EMAP_OK)
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp Thu Jan 23 10:15:46 2014
@@ -92,6 +92,8 @@ const uint32_t jerrno::JERR_RCVM_STREAMB
const uint32_t jerrno::JERR_RCVM_READ = 0x0902; ///< Read error: no or insufficient data to read
const uint32_t jerrno::JERR_RCVM_WRITE = 0x0903; ///< Write error
const uint32_t jerrno::JERR_RCVM_NULLXID = 0x0904; ///< Null XID when XID length non-null in header
+const uint32_t jerrno::JERR_RCVM_NOTDBLKALIGNED = 0x0905; ///< Offset is not data block (dblk)-aligned
+
// class data_tok
const uint32_t jerrno::JERR_DTOK_ILLEGALSTATE = 0x0a00;
@@ -182,6 +184,7 @@ jerrno::__init()
_err_map[JERR_RCVM_READ] = "JERR_RCVM_READ: Read error: no or insufficient data to read";
_err_map[JERR_RCVM_WRITE] = "JERR_RCVM_WRITE: Write error";
_err_map[JERR_RCVM_NULLXID] = "JERR_RCVM_NULLXID: Null XID when XID length non-null in header";
+ _err_map[JERR_RCVM_NOTDBLKALIGNED] = "JERR_RCVM_NOTDBLKALIGNED: Offset is not data block (dblk)-aligned";
// class data_tok
_err_map[JERR_DTOK_ILLEGALSTATE] = "JERR_MTOK_ILLEGALSTATE: Attempted to change to illegal state.";
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jerrno.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jerrno.h?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jerrno.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jerrno.h Thu Jan 23 10:15:46 2014
@@ -110,6 +110,7 @@ namespace journal {
static const uint32_t JERR_RCVM_READ; ///< Read error: no or insufficient data to read
static const uint32_t JERR_RCVM_WRITE; ///< Write error
static const uint32_t JERR_RCVM_NULLXID; ///< Null XID when XID length non-null in header
+ static const uint32_t JERR_RCVM_NOTDBLKALIGNED; ///< Offset is not data block (dblk)-aligned
// class data_tok
static const uint32_t JERR_DTOK_ILLEGALSTATE; ///< Attempted to change to illegal state
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/txn_map.cpp Thu Jan 23 10:15:46 2014
@@ -39,16 +39,50 @@ txn_data_t::txn_data_t(const uint64_t ri
const uint16_t pfid,
const uint64_t foffs,
const bool enq_flag,
+ const bool tpc_flag,
const bool commit_flag):
rid_(rid),
drid_(drid),
pfid_(pfid),
foffs_(foffs),
enq_flag_(enq_flag),
+ tpc_flag_(tpc_flag),
commit_flag_(commit_flag),
aio_compl_(false)
{}
+txn_op_stats_t::txn_op_stats_t(const txn_data_list_t& tdl) :
+ enqCnt(0U),
+ deqCnt(0U),
+ tpcCnt(0U),
+ abortCnt(0U),
+ commitCnt(0U),
+ rid(0ULL)
+{
+ for (tdl_const_itr_t i=tdl.begin(); i!=tdl.end(); ++i) {
+ if (i->enq_flag_) {
+ ++enqCnt;
+ rid = i->rid_;
+ } else {
+ ++deqCnt;
+ if (i->commit_flag_) {
+ ++commitCnt;
+ } else {
+ ++abortCnt;
+ }
+ }
+ if (i->tpc_flag_) {
+ ++tpcCnt;
+ }
+ }
+ if (tpcCnt > 0 && tpcCnt != tdl.size()) {
+ throw jexception("Inconsistent 2PC count"); // TODO: complete exception details
+ }
+ if (abortCnt > 0 && commitCnt > 0) {
+ throw jexception("Both abort and commit in same transaction"); // TODO: complete exception details
+ }
+}
+
txn_map::txn_map():
_map()/*,
_pfid_txn_cnt()*/
@@ -64,7 +98,7 @@ txn_map::insert_txn_data(const std::stri
xmap_itr itr = _map.find(xid);
if (itr == _map.end()) // not found in map
{
- txn_data_list list;
+ txn_data_list_t list;
list.push_back(td);
std::pair<xmap_itr, bool> ret = _map.insert(xmap_param(xid, list));
if (!ret.second) // duplicate
@@ -75,14 +109,14 @@ txn_map::insert_txn_data(const std::stri
return ok;
}
-const txn_data_list
+const txn_data_list_t
txn_map::get_tdata_list(const std::string& xid)
{
slock s(_mutex);
return get_tdata_list_nolock(xid);
}
-const txn_data_list
+const txn_data_list_t
txn_map::get_tdata_list_nolock(const std::string& xid)
{
xmap_itr itr = _map.find(xid);
@@ -91,14 +125,14 @@ txn_map::get_tdata_list_nolock(const std
return itr->second;
}
-const txn_data_list
+const txn_data_list_t
txn_map::get_remove_tdata_list(const std::string& xid)
{
slock s(_mutex);
xmap_itr itr = _map.find(xid);
if (itr == _map.end()) // not found in map
return _empty_data_list;
- txn_data_list list = itr->second;
+ txn_data_list_t list = itr->second;
_map.erase(itr);
return list;
}
@@ -130,7 +164,7 @@ txn_map::cnt(const bool enq_flag)
uint32_t c = 0;
for (xmap_itr i = _map.begin(); i != _map.end(); i++)
{
- for (tdl_itr j = i->second.begin(); j < i->second.end(); j++)
+ for (tdl_itr_t j = i->second.begin(); j < i->second.end(); j++)
{
if (j->enq_flag_ == enq_flag)
c++;
@@ -147,7 +181,7 @@ txn_map::is_txn_synced(const std::string
if (itr == _map.end()) // not found in map
return TMAP_XID_NOT_FOUND;
bool is_synced = true;
- for (tdl_itr litr = itr->second.begin(); litr < itr->second.end(); litr++)
+ for (tdl_itr_t litr = itr->second.begin(); litr < itr->second.end(); litr++)
{
if (!litr->aio_compl_)
{
@@ -165,7 +199,7 @@ txn_map::set_aio_compl(const std::string
xmap_itr itr = _map.find(xid);
if (itr == _map.end()) // xid not found in map
return TMAP_XID_NOT_FOUND;
- for (tdl_itr litr = itr->second.begin(); litr < itr->second.end(); litr++)
+ for (tdl_itr_t litr = itr->second.begin(); litr < itr->second.end(); litr++)
{
if (litr->rid_ == rid)
{
@@ -183,8 +217,8 @@ txn_map::data_exists(const std::string&
bool found = false;
{
slock s(_mutex);
- txn_data_list tdl = get_tdata_list_nolock(xid);
- tdl_itr itr = tdl.begin();
+ txn_data_list_t tdl = get_tdata_list_nolock(xid);
+ tdl_itr_t itr = tdl.begin();
while (itr != tdl.end() && !found)
{
found = itr->rid_ == rid;
@@ -202,8 +236,8 @@ txn_map::is_enq(const uint64_t rid)
slock s(_mutex);
for (xmap_itr i = _map.begin(); i != _map.end() && !found; i++)
{
- txn_data_list list = i->second;
- for (tdl_itr j = list.begin(); j < list.end() && !found; j++)
+ txn_data_list_t list = i->second;
+ for (tdl_itr_t j = list.begin(); j < list.end() && !found; j++)
{
if (j->enq_flag_)
found = j->rid_ == rid;
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/txn_map.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/txn_map.h?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/txn_map.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/txn_map.h Thu Jan 23 10:15:46 2014
@@ -42,17 +42,31 @@ namespace journal {
uint16_t pfid_; ///< Physical file id, to be used when transferring to emap on commit
uint64_t foffs_; ///< Offset in file for this record
bool enq_flag_; ///< If true, enq op, otherwise deq op
- bool commit_flag_; ///< (2PC transactions) Records 2PC complete c/a mode
+ bool tpc_flag_; ///< 2PC transaction if true
+ bool commit_flag_; ///< TPL only: (2PC transactions) Records 2PC complete c/a mode
bool aio_compl_; ///< Initially false, set to true when record AIO returns
txn_data_t(const uint64_t rid,
const uint64_t drid,
const uint16_t pfid,
const uint64_t foffs,
const bool enq_flag,
- const bool commit_flag = false);
+ const bool tpc_flag,
+ const bool commit_flag);
} txn_data_t;
- typedef std::vector<txn_data_t> txn_data_list;
- typedef txn_data_list::iterator tdl_itr;
+ typedef std::vector<txn_data_t> txn_data_list_t;
+ typedef txn_data_list_t::iterator tdl_itr_t;
+ typedef txn_data_list_t::const_iterator tdl_const_itr_t;
+
+ typedef struct txn_op_stats_t
+ {
+ uint16_t enqCnt;
+ uint16_t deqCnt;
+ uint16_t tpcCnt;
+ uint16_t abortCnt;
+ uint16_t commitCnt;
+ uint64_t rid;
+ txn_op_stats_t(const txn_data_list_t& tdl);
+ } txn_op_stats_t;
/**
* \class txn_map
@@ -100,21 +114,21 @@ namespace journal {
static int16_t TMAP_SYNCED;
private:
- typedef std::pair<std::string, txn_data_list> xmap_param;
- typedef std::map<std::string, txn_data_list> xmap;
+ typedef std::pair<std::string, txn_data_list_t> xmap_param;
+ typedef std::map<std::string, txn_data_list_t> xmap;
typedef xmap::iterator xmap_itr;
xmap _map;
smutex _mutex;
- const txn_data_list _empty_data_list;
+ const txn_data_list_t _empty_data_list;
public:
txn_map();
virtual ~txn_map();
bool insert_txn_data(const std::string& xid, const txn_data_t& td);
- const txn_data_list get_tdata_list(const std::string& xid);
- const txn_data_list get_remove_tdata_list(const std::string& xid);
+ const txn_data_list_t get_tdata_list(const std::string& xid);
+ const txn_data_list_t get_remove_tdata_list(const std::string& xid);
bool in_map(const std::string& xid);
uint32_t enq_cnt();
uint32_t deq_cnt();
@@ -128,7 +142,7 @@ namespace journal {
void xid_list(std::vector<std::string>& xv);
private:
uint32_t cnt(const bool enq_flag);
- const txn_data_list get_tdata_list_nolock(const std::string& xid);
+ const txn_data_list_t get_tdata_list_nolock(const std::string& xid);
};
}}}
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp Thu Jan 23 10:15:46 2014
@@ -220,11 +220,9 @@ txn_rec::decode(::rec_hdr_t& h, std::ifs
}
}
ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size());
- if (::rec_tail_check(&_txn_tail, &_txn_hdr._rhdr, 0)) { // TODO: add checksum
- throw jexception(jerrno::JERR_JREC_BADRECTAIL); // TODO: complete exception detail
- }
assert(!ifsp->fail() && !ifsp->bad());
assert(_txn_hdr._xidsize > 0);
+
Checksum checksum;
checksum.addData((unsigned char*)&_txn_hdr, sizeof(_txn_hdr));
checksum.addData((unsigned char*)_buff, _txn_hdr._xidsize);
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/utils/deq_hdr.c
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/utils/deq_hdr.c?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/utils/deq_hdr.c (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/utils/deq_hdr.c Thu Jan 23 10:15:46 2014
@@ -41,6 +41,6 @@ bool is_txn_coml_commit(const deq_hdr_t
}
void set_txn_coml_commit(deq_hdr_t *dh, const bool commit) {
- dh->_rhdr._uflag = commit ? dh->_rhdr._uflag | DEQ_HDR_TXNCMPLCOMMIT_MASK :
- dh->_rhdr._uflag & (~DEQ_HDR_TXNCMPLCOMMIT_MASK);
+ dh->_rhdr._uflag = commit ? dh->_rhdr._uflag | DEQ_HDR_TXNCMPLCOMMIT_MASK : // set flag bit
+ dh->_rhdr._uflag & (~DEQ_HDR_TXNCMPLCOMMIT_MASK); // unset flag bit
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.c
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.c?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.c (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.c Thu Jan 23 10:15:46 2014
@@ -75,6 +75,7 @@ void file_hdr_copy(file_hdr_t* dest, con
void file_hdr_reset(file_hdr_t* target) {
target->_rhdr._uflag = 0;
+ target->_rhdr._serial = 0;
target->_rhdr._rid = 0;
target->_fro = 0;
target->_ts_sec = 0;
@@ -85,6 +86,7 @@ void file_hdr_reset(file_hdr_t* target)
int is_file_hdr_reset(file_hdr_t* target) {
return target->_rhdr._uflag == 0 &&
+ target->_rhdr._serial == 0 &&
target->_rhdr._rid == 0 &&
target->_ts_sec == 0 &&
target->_ts_nsec == 0 &&
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.h?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/utils/file_hdr.h Thu Jan 23 10:15:46 2014
@@ -51,7 +51,7 @@ extern "C"{
* +---+---+---+---+---+---+---+---+ |
* | rid | |
* +---+---+---+---+---+---+---+---+ -+
- * | fs | partn | reserved |
+ * | fhs | partn | reserved |
* +---+---+---+---+---+---+---+---+
* | data-size |
* +---+---+---+---+---+---+---+---+
@@ -70,7 +70,7 @@ extern "C"{
*
* ver = Journal version
* rid = Record ID
- * fs = File header size in sblks (defined by JRNL_SBLK_SIZE)
+ * fhs = File header size in sblks (defined by JRNL_SBLK_SIZE)
* partn = EFP partition from which this file came
* fro = First Record Offset
* qnl = Length of the queue name in octets.
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/utils/rec_hdr.c
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/utils/rec_hdr.c?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/utils/rec_hdr.c (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/utils/rec_hdr.c Thu Jan 23 10:15:46 2014
@@ -1,3 +1,24 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
#include "rec_hdr.h"
void rec_hdr_init(rec_hdr_t* dest, const uint32_t magic, const uint16_t version, const uint16_t uflag, const uint64_t serial, const uint64_t rid) {
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/wmgr.cpp Thu Jan 23 10:15:46 2014
@@ -108,6 +108,7 @@ wmgr::enqueue(const void* const data_buf
data_tok* dtokp,
const void* const xid_ptr,
const std::size_t xid_len,
+ const bool tpc_flag,
const bool transient,
const bool external)
{
@@ -196,7 +197,7 @@ wmgr::enqueue(const void* const data_buf
if (xid_len) // If part of transaction, add to transaction map
{
std::string xid((const char*)xid_ptr, xid_len);
- _tmap.insert_txn_data(xid, txn_data_t(rid, 0, dtokp->fid(), 0, true));
+ _tmap.insert_txn_data(xid, txn_data_t(rid, 0, dtokp->fid(), 0, true, tpc_flag, false));
}
else
{
@@ -228,6 +229,7 @@ iores
wmgr::dequeue(data_tok* dtokp,
const void* const xid_ptr,
const std::size_t xid_len,
+ const bool tpc_flag,
const bool txn_coml_commit)
{
if (xid_len)
@@ -312,7 +314,7 @@ wmgr::dequeue(data_tok* dtokp,
// If the enqueue is part of a pending txn, it will not yet be in emap
_emap.lock(dequeue_rid); // ignore rid not found error
std::string xid((const char*)xid_ptr, xid_len);
- _tmap.insert_txn_data(xid, txn_data_t(rid, dequeue_rid, dtokp->fid(), 0, false));
+ _tmap.insert_txn_data(xid, txn_data_t(rid, dequeue_rid, dtokp->fid(), 0, false, tpc_flag, false));
}
else
{
@@ -424,8 +426,8 @@ wmgr::abort(data_tok* dtokp,
// Delete this txn from tmap, unlock any locked records in emap
std::string xid((const char*)xid_ptr, xid_len);
- txn_data_list tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found
- for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+ txn_data_list_t tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found
+ for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++)
{
if (!itr->enq_flag_)
_emap.unlock(itr->drid_); // ignore rid not found error
@@ -523,8 +525,8 @@ wmgr::commit(data_tok* dtokp,
// Delete this txn from tmap, process records into emap
std::string xid((const char*)xid_ptr, xid_len);
- txn_data_list tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found
- for (tdl_itr itr = tdl.begin(); itr != tdl.end(); itr++)
+ txn_data_list_t tdl = _tmap.get_remove_tdata_list(xid); // tdl will be empty if xid not found
+ for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++)
{
if (itr->enq_flag_) // txn enqueue
{
@@ -621,8 +623,8 @@ wmgr::flush_check(iores& res,
}
// If file is full, rotate to next file
- uint32_t fileSize_pgs = _lfc.fileSize_sblks() / _cache_pgsize_sblks;
- if (_pg_cntr >= fileSize_pgs)
+ uint32_t dataSize_pgs = _lfc.dataSize_sblks() / _cache_pgsize_sblks;
+ if (_pg_cntr >= dataSize_pgs)
{
//std::cout << _pg_cntr << ">=" << fileSize_pgs << std::flush;
get_next_file();
@@ -638,8 +640,8 @@ iores
wmgr::flush()
{
iores res = write_flush();
- uint32_t fileSize_pgs = _lfc.fileSize_sblks() / _cache_pgsize_sblks;
- if (res == RHM_IORES_SUCCESS && _pg_cntr >= fileSize_pgs) {
+ uint32_t dataSize_pgs = _lfc.dataSize_sblks() / _cache_pgsize_sblks;
+ if (res == RHM_IORES_SUCCESS && _pg_cntr >= dataSize_pgs) {
get_next_file();
}
return res;
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/wmgr.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/wmgr.h?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/wmgr.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/wmgr.h Thu Jan 23 10:15:46 2014
@@ -97,11 +97,13 @@ public:
data_tok* dtokp,
const void* const xid_ptr,
const std::size_t xid_len,
+ const bool tpc_flag,
const bool transient,
const bool external);
iores dequeue(data_tok* dtokp,
const void* const xid_ptr,
const std::size_t xid_len,
+ const bool tpc_flag,
const bool txn_coml_commit);
iores abort(data_tok* dtokp,
const void* const xid_ptr,
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/log/Logger.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/log/Logger.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/log/Logger.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/log/Logger.cpp Thu Jan 23 10:15:46 2014
@@ -68,7 +68,7 @@ Logger::Logger() : flags(0) {
// Initialize myself from env variables so all programs
// (e.g. tests) can use logging even if they don't parse
// command line args.
- Options opts("");
+ Options opts;
opts.parse(0, 0);
configure(opts);
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/log/Options.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/log/Options.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/log/Options.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/log/Options.cpp Thu Jan 23 10:15:46 2014
@@ -27,8 +27,6 @@
namespace qpid {
namespace log {
-using namespace std;
-
Options::Options(const std::string& argv0_, const std::string& name_) :
qpid::Options(name_),
argv0(argv0_),
@@ -45,25 +43,15 @@ Options::Options(const std::string& argv
{
selectors.push_back("notice+");
- ostringstream levels;
- levels << LevelTraits::name(Level(0));
- for (int i = 1; i < LevelTraits::COUNT; ++i)
- levels << " " << LevelTraits::name(Level(i));
-
- ostringstream categories;
- categories << CategoryTraits::name(Category(0));
- for (int i = 1; i < CategoryTraits::COUNT; ++i)
- categories << " " << CategoryTraits::name(Category(i));
-
addOptions()
("trace,t", optValue(trace), "Enables all logging" )
("log-enable", optValue(selectors, "RULE"),
("Enables logging for selected levels and components. "
"RULE is in the form 'LEVEL[+-][:PATTERN]'\n"
- "LEVEL is one of: \n\t "+levels.str()+"\n"
+ "LEVEL is one of: \n\t "+getLevels()+"\n"
"PATTERN is a logging category name, or a namespace-qualified "
"function name or name fragment. "
- "Logging category names are: \n\t "+categories.str()+"\n"
+ "Logging category names are: \n\t "+getCategories()+"\n"
"For example:\n"
"\t'--log-enable warning+'\n"
"logs all warning, error and critical messages.\n"
@@ -75,10 +63,10 @@ Options::Options(const std::string& argv
("log-disable", optValue(deselectors, "RULE"),
("Disables logging for selected levels and components. "
"RULE is in the form 'LEVEL[+-][:PATTERN]'\n"
- "LEVEL is one of: \n\t "+levels.str()+"\n"
+ "LEVEL is one of: \n\t "+getLevels()+"\n"
"PATTERN is a logging category name, or a namespace-qualified "
"function name or name fragment. "
- "Logging category names are: \n\t "+categories.str()+"\n"
+ "Logging category names are: \n\t "+getCategories()+"\n"
"For example:\n"
"\t'--log-disable warning-'\n"
"disables logging all warning, notice, info, debug, and trace messages.\n"
@@ -139,4 +127,22 @@ Options& Options::operator=(const Option
return *this;
}
+std::string getLevels()
+{
+ std::ostringstream levels;
+ levels << LevelTraits::name(Level(0));
+ for (int i = 1; i < LevelTraits::COUNT; ++i)
+ levels << " " << LevelTraits::name(Level(i));
+ return levels.str();
+}
+
+std::string getCategories()
+{
+ std::ostringstream categories;
+ categories << CategoryTraits::name(Category(0));
+ for (int i = 1; i < CategoryTraits::COUNT; ++i)
+ categories << " " << CategoryTraits::name(Category(i));
+ return categories.str();
+}
+
}} // namespace qpid::log
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/log/Options.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/log/Options.h?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/log/Options.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/log/Options.h Thu Jan 23 10:15:46 2014
@@ -46,6 +46,12 @@ struct Options : public qpid::Options {
std::auto_ptr<SinkOptions> sinkOptions;
};
+/** Get a string list of the allowed levels */
+QPID_COMMON_EXTERN std::string getLevels();
+
+/** Get a string list of the allowed categories */
+QPID_COMMON_EXTERN std::string getCategories();
+
}} // namespace qpid::log
#endif /*!QPID_LOG_OPTIONS_H*/
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/log/Selector.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/log/Selector.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/log/Selector.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/log/Selector.cpp Thu Jan 23 10:15:46 2014
@@ -220,6 +220,8 @@ bool Selector::isDisabled(Level level, c
// level/function/category set from an actual QPID_LOG Statement.
//
bool Selector::isEnabled(Level level, const char* function, Category category) {
+ if (level==critical)
+ return true; // critical cannot be disabled
if (isDisabled(level, function))
return false; // Disabled by function name
if (disableFlags[level][category])
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/log/Statement.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/log/Statement.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/log/Statement.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/log/Statement.cpp Thu Jan 23 10:15:46 2014
@@ -200,7 +200,7 @@ const char* names[LevelTraits::COUNT] =
const char* catNames[CategoryTraits::COUNT] = {
"Security", "Broker", "Management", "Protocol", "System", "HA", "Messaging",
- "Store", "Network", "Test", "Client", "Model", "Unspecified"
+ "Store", "Network", "Test", "Client", "Application", "Model", "Unspecified"
};
} // namespace
@@ -235,4 +235,5 @@ Category CategoryTraits::category(const
const char* CategoryTraits::name(Category c) {
return catNames[c];
}
+
}} // namespace qpid::log
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/log/Statement.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/log/Statement.h?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/log/Statement.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/log/Statement.h Thu Jan 23 10:15:46 2014
@@ -71,11 +71,12 @@ struct LevelTraits {
* Store store
* Network tcp rdma AsynchIO socket epoll
* Test
+ * External_application <no directory - signifies log message from non qpid application code>
* Model <not related to a directory>
* Unspecified <must be last in enum>
*/
enum Category { security, broker, management, protocol, system, ha, messaging,
- store, network, test, client, model, unspecified };
+ store, network, test, client, external_application, model, unspecified };
struct CategoryTraits {
static const int COUNT=unspecified+1;
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.cpp Thu Jan 23 10:15:46 2014
@@ -47,6 +47,18 @@ namespace qpid {
namespace messaging {
namespace amqp {
namespace {
+
+std::string asString(const std::vector<std::string>& v) {
+ std::stringstream os;
+ os << "[";
+ for(std::vector<std::string>::const_iterator i = v.begin(); i != v.end(); ++i ) {
+ if (i != v.begin()) os << ", ";
+ os << '"' << *i << '"';
+ }
+ os << "]";
+ return os.str();
+}
+
//remove conditional when 0.5 is no longer supported
#ifdef HAVE_PROTON_TRACER
void do_trace(pn_transport_t* transport, const char* message)
@@ -437,27 +449,33 @@ void ConnectionContext::reset()
pn_transport_bind(engine, connection);
}
-void ConnectionContext::check()
-{
- if (state == DISCONNECTED) {
+void ConnectionContext::check() {
+ if (checkDisconnected()) {
if (ConnectionOptions::reconnect) {
- reset();
autoconnect();
} else {
throw qpid::messaging::TransportFailure("Disconnected (reconnect disabled)");
}
}
- if ((pn_connection_state(connection) & REQUIRES_CLOSE) == REQUIRES_CLOSE) {
- pn_condition_t* error = pn_connection_remote_condition(connection);
- std::stringstream text;
- if (pn_condition_is_set(error)) {
- text << "Connection closed by peer with " << pn_condition_get_name(error) << ": " << pn_condition_get_description(error);
- } else {
- text << "Connection closed by peer";
+}
+
+bool ConnectionContext::checkDisconnected() {
+ if (state == DISCONNECTED) {
+ reset();
+ } else {
+ if ((pn_connection_state(connection) & REQUIRES_CLOSE) == REQUIRES_CLOSE) {
+ pn_condition_t* error = pn_connection_remote_condition(connection);
+ std::stringstream text;
+ if (pn_condition_is_set(error)) {
+ text << "Connection closed by peer with " << pn_condition_get_name(error) << ": " << pn_condition_get_description(error);
+ } else {
+ text << "Connection closed by peer";
+ }
+ pn_connection_close(connection);
+ throw qpid::messaging::ConnectionError(text.str());
}
- pn_connection_close(connection);
- throw qpid::messaging::ConnectionError(text.str());
}
+ return state == DISCONNECTED;
}
void ConnectionContext::wait()
@@ -843,16 +861,6 @@ void ConnectionContext::open()
namespace {
-std::string asString(const std::vector<std::string>& v) {
- std::stringstream os;
- os << "[";
- for(std::vector<std::string>::const_iterator i = v.begin(); i != v.end(); ++i ) {
- if (i != v.begin()) os << ", ";
- os << *i;
- }
- os << "]";
- return os.str();
-}
double FOREVER(std::numeric_limits<double>::max());
bool expired(const sys::AbsTime& start, double timeout)
{
@@ -894,6 +902,7 @@ bool ConnectionContext::tryConnect()
if (tryConnect(qpid::Url(*i, protocol.empty() ? qpid::Address::TCP : protocol))) {
return true;
}
+ QPID_LOG(info, "Failed to connect to " << *i);
} catch (const qpid::messaging::TransportFailure& e) {
QPID_LOG(info, "Failed to connect to " << *i << ": " << e.what());
}
@@ -923,6 +932,13 @@ void ConnectionContext::reconnect()
}
}
+void ConnectionContext::waitNoReconnect() {
+ if (!checkDisconnected()) {
+ lock.wait();
+ checkDisconnected();
+ }
+}
+
bool ConnectionContext::tryConnect(const Url& url)
{
if (url.getUser().size()) username = url.getUser();
@@ -934,10 +950,11 @@ bool ConnectionContext::tryConnect(const
setCurrentUrl(*i);
if (sasl.get()) {
wakeupDriver();
- while (!sasl->authenticated()) {
+ while (!sasl->authenticated() && state != DISCONNECTED) {
QPID_LOG(debug, id << " Waiting to be authenticated...");
- wait();
+ waitNoReconnect();
}
+ if (state == DISCONNECTED) continue;
QPID_LOG(debug, id << " Authenticated");
}
@@ -945,9 +962,10 @@ bool ConnectionContext::tryConnect(const
setProperties();
pn_connection_open(connection);
wakeupDriver(); //want to write
- while (pn_connection_state(connection) & PN_REMOTE_UNINIT) {
- wait();
- }
+ while ((pn_connection_state(connection) & PN_REMOTE_UNINIT) &&
+ state != DISCONNECTED)
+ waitNoReconnect();
+ if (state == DISCONNECTED) continue;
if (!(pn_connection_state(connection) & PN_REMOTE_ACTIVE)) {
throw qpid::messaging::ConnectionError("Failed to open connection");
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/messaging/amqp/ConnectionContext.h Thu Jan 23 10:15:46 2014
@@ -150,6 +150,8 @@ class ConnectionContext : public qpid::s
CodecAdapter codecAdapter;
void check();
+ bool checkDisconnected();
+ void waitNoReconnect();
void wait();
void waitUntil(qpid::sys::AbsTime until);
void wait(boost::shared_ptr<SessionContext>);
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/messaging/amqp/EncodedMessage.cpp Thu Jan 23 10:15:46 2014
@@ -145,7 +145,7 @@ void EncodedMessage::populate(qpid::type
map["x-amqp-group-id"] = groupId.str();
}
if (!!groupSequence) {
- map["x-amqp-qroup-sequence"] = groupSequence.get();
+ map["x-amqp-group-sequence"] = groupSequence.get();
}
if (replyToGroupId) {
map["x-amqp-reply-to-group-id"] = replyToGroupId.str();
Propchange: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/cpp/src/tests:r1549895-1558036
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/CMakeLists.txt?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/CMakeLists.txt (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/CMakeLists.txt Thu Jan 23 10:15:46 2014
@@ -42,6 +42,11 @@ macro(remember_location testname)
set (${testname}_LOCATION ${CMAKE_CURRENT_BINARY_DIR}/${testname}${CMAKE_EXECUTABLE_SUFFIX})
endmacro(remember_location)
+# If we're using GCC allow variadic macros (even though they're c99 not c++01)
+if (CMAKE_COMPILER_IS_GNUCXX)
+ add_definitions(-Wno-variadic-macros)
+endif (CMAKE_COMPILER_IS_GNUCXX)
+
# Windows uses some process-startup calls to ensure that errors, etc. don't
# result in error boxes being thrown up. Since it's expected that most test
# runs will be in scripts, the default is to force these outputs to stderr
@@ -175,6 +180,7 @@ set(all_unit_tests
ManagementTest
MessageReplayTracker
MessageTest
+ MessagingLogger
MessagingSessionTests
PollableCondition
ProxyTest
@@ -294,16 +300,12 @@ if (BUILD_SASL)
remember_location(sasl_version)
endif (BUILD_SASL)
-# This should ideally be done as part of the test run, but I don't know a way
-# to get these arguments and the working directory set like Makefile.am does,
-# and have that run during the test pass.
-#
-# Need to check to see that the python tools are included as part of the source
-# tree first and don't install them or run dependent tests if they are not there
-#
+# Always run the python install, setup.py is smart enough to do only what is needed.
set (python_bld ${CMAKE_CURRENT_BINARY_DIR}/python)
-execute_process(COMMAND ${PYTHON_EXECUTABLE} setup.py install --prefix=${pythoon_bld} --install-lib=${python_bld} --install-scripts=${python_bld}/commands
- WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}/../python)
+set (python_src ${CMAKE_SOURCE_DIR}/../python)
+add_custom_target(python_bld ALL
+ COMMAND ${PYTHON_EXECUTABLE} setup.py install --prefix=${python_bld} --install-lib=${python_bld} --install-scripts=${python_bld}/commands
+ WORKING_DIRECTORY ${python_src})
if (BUILD_SASL)
add_test (sasl_fed ${test_wrap} -- ${CMAKE_CURRENT_SOURCE_DIR}/sasl_fed${test_script_suffix})
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/brokertest.py?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/brokertest.py Thu Jan 23 10:15:46 2014
@@ -253,7 +253,7 @@ class Broker(Popen):
self.test = test
self._port=port
- if BrokerTest.store_lib:
+ if BrokerTest.store_lib and not test_store:
args = args + ['--load-module', BrokerTest.store_lib]
if BrokerTest.sql_store_lib:
args = args + ['--load-module', BrokerTest.sql_store_lib]
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/config.null
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/config.null?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/config.null (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/config.null Thu Jan 23 10:15:46 2014
@@ -1 +1,21 @@
-# empty config
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# Deliberately empty configuration file for tests.
+
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/failing-amqp0-10-python-tests
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/failing-amqp0-10-python-tests?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/failing-amqp0-10-python-tests (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/failing-amqp0-10-python-tests Thu Jan 23 10:15:46 2014
@@ -1,3 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
#The following four tests fail the because pure python client excludes
#the node type for queues from the reply-to address, weheras the swigged
#client does not (as that prevents it resolving the node on every send)
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/failing-amqp1.0-python-tests
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/failing-amqp1.0-python-tests?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/failing-amqp1.0-python-tests (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/failing-amqp1.0-python-tests Thu Jan 23 10:15:46 2014
@@ -1,2 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
qpid_tests.broker_0_10.new_api.GeneralTests.test_qpid_3481_acquired_to_alt_exchange_2_consumers
qpid_tests.broker_0_10.new_api.GeneralTests.test_qpid_3481_acquired_to_alt_exchange
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/ha_tests.py?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/ha_tests.py Thu Jan 23 10:15:46 2014
@@ -233,6 +233,33 @@ class ReplicationTests(HaBrokerTest):
c.close()
finally: l.restore()
+
+ def test_heartbeat_python(self):
+ """Verify that a python client with a heartbeat specified disconnects
+ from a stalled broker and does not hang indefinitely."""
+
+ broker = Broker(self)
+ broker_addr = broker.host_port()
+
+ # Case 1: Connect before stalling the broker, use the connection after stalling.
+ c = Connection(broker_addr, heartbeat=1)
+ c.open()
+ os.kill(broker.pid, signal.SIGSTOP) # Stall the broker
+ self.assertRaises(ConnectionError, c.session().sender, "foo")
+
+ # Case 2: Connect to a stalled broker
+ c = Connection(broker_addr, heartbeat=1)
+ self.assertRaises(ConnectionError, c.open)
+
+ # Case 3: Re-connect to a stalled broker.
+ broker2 = Broker(self)
+ c = Connection(broker2.host_port(), heartbeat=1, reconnect_limit=1,
+ reconnect=True, reconnect_urls=[broker_addr],
+ reconnect_log=False) # Hide expected warnings
+ c.open()
+ broker2.kill() # Cause re-connection to broker
+ self.assertRaises(ConnectionError, c.session().sender, "foo")
+
def test_failover_cpp(self):
"""Verify that failover works in the C++ client."""
cluster = HaCluster(self, 2)
@@ -253,8 +280,6 @@ class ReplicationTests(HaBrokerTest):
"""Verify that a backup broker fails over and recovers queue state"""
brokers = HaCluster(self, 3)
brokers[0].connect().session().sender("q;{create:always}").send("a")
- for b in brokers[1:]: b.assert_browse_backup("q", ["a"], msg=b)
- brokers[0].expect = EXPECT_EXIT_FAIL
brokers.kill(0)
brokers[1].connect().session().sender("q").send("b")
brokers[2].assert_browse_backup("q", ["a","b"])
@@ -263,6 +288,13 @@ class ReplicationTests(HaBrokerTest):
s.acknowledge()
brokers[2].assert_browse_backup("q", ["b"])
+ def test_empty_backup_failover(self):
+ """Verify that a new primary becomes active with no queues.
+ Regression test for QPID-5430"""
+ brokers = HaCluster(self, 3)
+ brokers.kill(0)
+ brokers[1].wait_status("active")
+
def test_qpid_config_replication(self):
"""Set up replication via qpid-config"""
brokers = HaCluster(self,2)
@@ -272,33 +304,34 @@ class ReplicationTests(HaBrokerTest):
def test_standalone_queue_replica(self):
"""Test replication of individual queues outside of cluster mode"""
- l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
- try:
- primary = HaBroker(self, name="primary", ha_cluster=False,
- args=["--ha-queue-replication=yes"]);
- pc = primary.connect()
- ps = pc.session().sender("q;{create:always}")
- pr = pc.session().receiver("q;{create:always}")
- backup = HaBroker(self, name="backup", ha_cluster=False,
- args=["--ha-queue-replication=yes"])
- br = backup.connect().session().receiver("q;{create:always}")
+ primary = HaBroker(self, name="primary", ha_cluster=False,
+ args=["--ha-queue-replication=yes"]);
+ pc = primary.connect()
+ ps = pc.session().sender("q;{create:always}")
+ pr = pc.session().receiver("q;{create:always}")
+ backup = HaBroker(self, name="backup", ha_cluster=False,
+ args=["--ha-queue-replication=yes"])
+ bs = backup.connect().session()
+ br = bs.receiver("q;{create:always}")
+
+ def srange(*args): return [str(i) for i in xrange(*args)]
+
+ for m in srange(3): ps.send(m)
+ # Set up replication with qpid-ha
+ backup.replicate(primary.host_port(), "q")
+ backup.assert_browse_backup("q", srange(3))
+ for m in srange(3,6): ps.send(str(m))
+ backup.assert_browse_backup("q", srange(6))
+ self.assertEqual("0", pr.fetch().content)
+ pr.session.acknowledge()
+ backup.assert_browse_backup("q", srange(1,6))
+
+ # Set up replication with qpid-config
+ ps2 = pc.session().sender("q2;{create:always}")
+ backup.config_replicate(primary.host_port(), "q2");
+ ps2.send("x", timeout=1)
+ backup.assert_browse_backup("q2", ["x"])
- # Set up replication with qpid-ha
- backup.replicate(primary.host_port(), "q")
- ps.send("a", timeout=1)
- backup.assert_browse_backup("q", ["a"])
- ps.send("b", timeout=1)
- backup.assert_browse_backup("q", ["a", "b"])
- self.assertEqual("a", pr.fetch().content)
- pr.session.acknowledge()
- backup.assert_browse_backup("q", ["b"])
-
- # Set up replication with qpid-config
- ps2 = pc.session().sender("q2;{create:always}")
- backup.config_replicate(primary.host_port(), "q2");
- ps2.send("x", timeout=1)
- backup.assert_browse_backup("q2", ["x"])
- finally: l.restore()
def test_standalone_queue_replica_failover(self):
"""Test individual queue replication from a cluster to a standalone
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/linearstore/linearstoredirsetup.sh
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/linearstore/linearstoredirsetup.sh?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/linearstore/linearstoredirsetup.sh (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/linearstore/linearstoredirsetup.sh Thu Jan 23 10:15:46 2014
@@ -1,5 +1,25 @@
#!/usr/bin/env bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
STORE_DIR=/tmp
LINEARSTOREDIR=~/RedHat/linearstore
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/policy.acl
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/policy.acl?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/policy.acl (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/policy.acl Thu Jan 23 10:15:46 2014
@@ -1 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
acl allow all all
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/qpid-send.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/qpid-send.cpp?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/qpid-send.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/qpid-send.cpp Thu Jan 23 10:15:46 2014
@@ -242,7 +242,7 @@ class GetlineContentGenerator : public C
virtual bool setContent(Message& msg) {
string content;
bool got = getline(std::cin, content);
- if (got) msg.setContent(content);
+ if (got) msg.setContentObject(content);
return got;
}
};
@@ -251,7 +251,7 @@ class FixedContentGenerator : public C
public:
FixedContentGenerator(const string& s) : content(s) {}
virtual bool setContent(Message& msg) {
- msg.setContent(content);
+ msg.setContentObject(content);
return true;
}
private:
Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/qpidd-empty.conf
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/qpidd-empty.conf?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/qpidd-empty.conf (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/tests/qpidd-empty.conf Thu Jan 23 10:15:46 2014
@@ -1,3 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
# An empty configuration file.
# Used when running tests to avoid picking up configuration
# installed in the default place.
Modified: qpid/branches/java-broker-bdb-ha/qpid/doc/book/Makefile
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/doc/book/Makefile?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/doc/book/Makefile (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/doc/book/Makefile Thu Jan 23 10:15:46 2014
@@ -17,7 +17,7 @@
# under the License.
#
-DIRS = src/java-broker src/java-perftests src/cpp-broker src/programming
+DIRS = src/java-broker src/java-perftests src/cpp-broker src/programming src/jms-client-0-8
.PHONY: all $(DIRS)
Modified: qpid/branches/java-broker-bdb-ha/qpid/doc/book/src/java-broker/Java-Broker-Close-On-No-Route.xml
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/doc/book/src/java-broker/Java-Broker-Close-On-No-Route.xml?rev=1560618&r1=1560617&r2=1560618&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/doc/book/src/java-broker/Java-Broker-Close-On-No-Route.xml (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/doc/book/src/java-broker/Java-Broker-Close-On-No-Route.xml Thu Jan 23 10:15:46 2014
@@ -1,4 +1,24 @@
<?xml version="1.0" encoding="utf-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
<section id="Java-Broker-Close-Connection-When-No-Route">
<title>Closing client connections on unroutable mandatory messages</title>
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org