You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2014/02/12 14:27:57 UTC

svn commit: r1567616 [2/12] - in /qpid/branches/java-broker-bdb-ha: ./ qpid/ qpid/cpp/bindings/qmf2/ruby/ qpid/cpp/bindings/qpid/examples/perl/ qpid/cpp/bindings/qpid/perl/ qpid/cpp/bindings/qpid/perl/lib/qpid/messaging/ qpid/cpp/bindings/qpid/ruby/ qp...

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Wed Feb 12 13:27:51 2014
@@ -39,6 +39,7 @@
 #include "qpid/Msg.h"
 #include "qpid/assert.h"
 #include <boost/shared_ptr.hpp>
+#include <boost/weak_ptr.hpp>
 #include <boost/bind.hpp>
 
 
@@ -90,7 +91,8 @@ class QueueReplicator::ErrorListener : p
         QPID_LOG(debug, logPrefix << framing::createSessionException(code, msg).what());
     }
     void incomingExecutionException(ErrorCode code, const std::string& msg) {
-        if (!queueReplicator->deletedOnPrimary(code, msg))
+        boost::shared_ptr<QueueReplicator> qr = queueReplicator.lock();
+        if (qr && !qr->deletedOnPrimary(code, msg))
             QPID_LOG(error, logPrefix << "Incoming "
                      << framing::createSessionException(code, msg).what());
     }
@@ -98,7 +100,7 @@ class QueueReplicator::ErrorListener : p
         QPID_LOG(debug, logPrefix << "Session detached");
     }
   private:
-    boost::shared_ptr<QueueReplicator> queueReplicator;
+    boost::weak_ptr<QueueReplicator> queueReplicator;
     std::string logPrefix;
 };
 
@@ -112,9 +114,12 @@ class QueueReplicator::QueueObserver : p
     void consumerAdded( const Consumer& ) {}
     void consumerRemoved( const Consumer& ) {}
     // Queue observer is destroyed when the queue is.
-    void destroy() { queueReplicator->destroy(); }
+    void destroy() {
+        boost::shared_ptr<QueueReplicator> qr = queueReplicator.lock();
+        if (qr) qr->destroy();
+    }
   private:
-    boost::shared_ptr<QueueReplicator> queueReplicator;
+    boost::weak_ptr<QueueReplicator> queueReplicator;
 };
 
 boost::shared_ptr<QueueReplicator> QueueReplicator::create(
@@ -171,8 +176,7 @@ void QueueReplicator::initialize() {
         throw Exception(QPID_MSG("Duplicate queue replicator " << getName()));
 
     // Enable callback to initializeBridge
-    std::pair<Bridge::shared_ptr, bool> result =
-    queue->getBroker()->getLinks().declare(
+    boost::shared_ptr<Bridge> b = queue->getBroker()->getLinks().declare(
         bridgeName,
         *link,
         false,              // durable
@@ -189,10 +193,10 @@ void QueueReplicator::initialize() {
         // Include shared_ptr to self to ensure we are not deleted
         // before initializeBridge is called.
         boost::bind(&QueueReplicator::initializeBridge, shared_from_this(), _1, _2)
-    );
-    bridge = result.first;
-    bridge->setErrorListener(
+    ).first;
+    b->setErrorListener(
         boost::shared_ptr<ErrorListener>(new ErrorListener(shared_from_this())));
+    bridge = b;                 // bridge is a weak_ptr to avoid a cycle.
 
     // Enable callback to destroy()
     queue->getObservers().add(
@@ -211,7 +215,7 @@ void QueueReplicator::destroy() {
     {
         Mutex::ScopedLock l(lock);
         if (!queue) return;     // Already destroyed
-        bridge2 = bridge;       // call close outside the lock.
+        bridge2 = bridge.lock(); // !call close outside the lock.
         destroy(l);
     }
     if (bridge2) bridge2->close(); // Outside of lock, avoid deadlock.

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/QueueReplicator.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/QueueReplicator.h?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/QueueReplicator.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/ha/QueueReplicator.h Wed Feb 12 13:27:51 2014
@@ -112,7 +112,7 @@ class QueueReplicator : public broker::E
     const BrokerInfo brokerInfo;
     DispatchMap dispatch;
     boost::shared_ptr<broker::Link> link;
-    boost::shared_ptr<broker::Bridge> bridge;
+    boost::weak_ptr<broker::Bridge> bridge;
     boost::shared_ptr<broker::Queue> queue;
     broker::SessionHandler* sessionHandler;
 

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/ISSUES
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/ISSUES?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/ISSUES (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/ISSUES Wed Feb 12 13:27:51 2014
@@ -26,6 +26,7 @@ Current/pending:
    5359 -        Linearstore: Implement new management schema and wire into store
    5360 -        Linearstore: Evaluate and rework logging to produce a consistent log output
    5361 -        Linearstore: No tests for linearstore functionality currently exist
+                   svn r.1564893 2014-02-05: Added tx-test-soak.sh
                    * No existing tests for linearstore:
                    ** Basic broker-level tests for txn and non-txn recovery
                    ** Store-level tests which check write boundary conditions
@@ -34,29 +35,29 @@ Current/pending:
                    ** Basic performance tests
    5362 -        Linearstore: No store tools exist for examining the journals
                    svn r.1558888 2014-01-09: WIP checkin for linearstore version of qpid_qls_analyze. Needs testing and tidy-up.
+                   svn r.1560530 2014-01-22: Bugfixes for qpid_qls_analyze
+                   svn r.1561848 2014-01-27: Bugfixes and enhancements for qpid_qls_analyze
+                   svn r.1564808 2014-02-05: Bugfixes and enhancements for qpid_qls_analyze
                    * Store analysis and status
                    * Recovery/reading of message content
                    * Empty file pool status and management
    5464 -        [linearstore] Incompletely created journal files accumulate in EFP
-   5479 1053701  [linearstore] Using recovered store results in "JERR_JNLF_FILEOFFSOVFL: Attempted to increase submitted offset past file size. (JournalFile::submittedDblkCount)" error message
-                   * Probablilty: 2 of 600 (0.3%) using tx-test-soak.sh
-   5480 1053749  [linearstore] Recovery of store failure with "JERR_MAP_NOTFOUND: Key not found in map." error message
-                   * Probability: 6 of 600 (1.0%) using tx-test-soak.sh
-                   * If broker is started a second time after failure, it starts correctly and test completes ok.
    5484 1035843  Slow performance for producers
-                   svn r.1558592 fixes an issue with using /dev/random as a source of random numbers for Journal serial numbers.
-   -    1036026  [LinearStore] Qpid linear store unable to create durable queue - framing-error: Queue <q-name>: create() failed: jexception 0x0000
+                   svn r.1558592 2014-01-15 fixes an issue with using /dev/random as a source of random numbers for Journal serial numbers.
+                   svn r.1558913 2014-01-16 replaces use of /dev/urandom with several calls to rand() to construct a 64-bit random number.
+                   * Recommend rebuilding and testing for performance again with these two fixes. Marked POST.
+#  -    1036026  [LinearStore] Qpid linear store unable to create durable queue - framing-error: Queue <q-name>: create() failed: jexception 0x0000
                    UNABLE TO REPRODUCE - but Frantizek has additional info
    -    1039522  Qpid crashes while recovering from linear store around apid::linearstore::journal::JournalFile::getFqFileName() including enq_rec::decode() threw JERR_JREC_BAD_RECTAIL
                    * Possible dup of 1039525
-                   * May be fixed by QPID-5483 - waiting for needinfo
+                   * May be fixed by QPID-5483 - waiting for needinfo, recommend rebuilding with QPID-5483 fix and re-testing
    -    1039525  Qpid crashes while recovering from linear store around apid::linearstore::journal::jexception::format including enq_rec::decode() threw JERR_JREC_BAD_REC_TAIL
                    * Possible dup of 1039522
-                   * May be fixed by QPID-5483 - waiting for needinfo
-   5487 -        [linearstore] Replace use of /dev/urandom with c random generator calls
+                   * May be fixed by QPID-5483 - waiting for needinfo, recommend rebuilding with QPID-5483 fix and re-testing
+#  -    1049870  [LinearStore] auto-delete property does not survive restart
 
-Fixed/closed:
-=============
+Fixed/closed (in commit order):
+===============================
  Q-JIRA RHBZ     Description / Comments
  ------ -------  ----------------------
    5357 1052518  Linearstore: Empty file recycling not functional
@@ -85,6 +86,17 @@ NO-JIRA -        Added missing Apache co
                    svn r.1558589 2014-01-15: Proposed fix
                    * May be linked to RHBZ 1039522 - waiting for needinfo
                    * May be linked to RHBZ 1039525 - waiting for needinfo
+   5487 1054448  [linearstore] Replace use of /dev/urandom with c random generator calls
+                   svn r.1558913 2014-01-16: Proposed fix
+   5480 1053749  [linearstore] Recovery of store failure with "JERR_MAP_NOTFOUND: Key not found in map." error message
+                   svn r.1564877 2014-02-05: Proposed fix
+                   * Probability: 6 of 600 (1.0%) using tx-test-soak.sh
+                   * If broker is started a second time after failure, it starts correctly and test completes ok.
+                   * Problem: File is being recycled to EFP with still-locked enqueues in it (ie dequeued transactionally).
+                   * Problem: Record alignment check writes filler records to wrong file when decoding bad record moves across a file boundary
+   5479 1053701  [linearstore] Using recovered store results in "JERR_JNLF_FILEOFFSOVFL: Attempted to increase submitted offset past file size. (JournalFile::submittedDblkCount)" error message
+                   * Probability: 2 of 600 (0.3%) using tx-test-soak.sh
+                   * Fixed by checkin for QPID-5480, no longer able to reproduce. Marked POST.
 
 Future:
 =======
@@ -101,3 +113,4 @@ Code tidy-up
 * Member names: xxx_
 * Rename classes, functions and variables to camel-case
 * Add Doxygen docs to classes
+* Make fid's consistent in name (fid, file_id, pfid) and format (hex vs decimal)

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp Wed Feb 12 13:27:51 2014
@@ -593,7 +593,7 @@ void MessageStoreImpl::recover(qpid::bro
     std::ostringstream oss;
     oss << "Recovered transaction prepared list:";
     for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) {
-        oss << std::endl << "     " << str2hexnum(i->xid);
+        oss << std::endl << "     " << qpid::linearstore::journal::jcntl::str2hexnum(i->xid);
     }
     QLS_LOG(debug, oss.str());
 
@@ -1292,7 +1292,7 @@ void MessageStoreImpl::completed(TxnCtxt
                 mgmtObject->inc_tplTxnAborts();
         }
     } catch (const std::exception& e) {
-        QLS_LOG(error, "Error completing xid " << txn_.getXid() << ": " << e.what());
+        QLS_LOG(error, "Error completing xid " << qpid::linearstore::journal::jcntl::str2hexnum(txn_.getXid()) << ": " << e.what());
         throw;
     }
 }
@@ -1516,15 +1516,6 @@ void MessageStoreImpl::journalDeleted(Jo
     journalList.erase(j_.id());
 }
 
-std::string MessageStoreImpl::str2hexnum(const std::string& str) {
-    std::ostringstream oss;
-    oss << "(" << str.size() << ")0x" << std::hex;
-    for (unsigned i=str.size(); i>0; --i) {
-        oss << std::setfill('0') << std::setw(2) << (uint16_t)(uint8_t)str[i-1];
-    }
-    return oss.str();
-}
-
 MessageStoreImpl::StoreOptions::StoreOptions(const std::string& name_) :
                                              qpid::Options(name_),
                                              truncateFlag(defTruncateFlag),

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h Wed Feb 12 13:27:51 2014
@@ -235,8 +235,6 @@ class MessageStoreImpl : public qpid::br
     }
     void chkTplStoreInit();
 
-    static std::string str2hexnum(const std::string& str);
-
   public:
     typedef boost::shared_ptr<MessageStoreImpl> shared_ptr;
 

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/TxnCtxt.cpp Wed Feb 12 13:27:51 2014
@@ -52,7 +52,9 @@ void TxnCtxt::commitTxn(JournalImpl* jc,
                 jc->txn_abort(dtokp.get(), getXid());
             }
         } catch (const qpid::linearstore::journal::jexception& e) {
-            THROW_STORE_EXCEPTION(std::string("Error commit") + e.what());
+            std::ostringstream oss;
+            oss << "Error during " << (commit ? "commit" : "abort") << ": " << e.what();
+            THROW_STORE_EXCEPTION(oss.str());
         }
     }
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/JournalFile.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/JournalFile.cpp?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/JournalFile.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/JournalFile.cpp Wed Feb 12 13:27:51 2014
@@ -27,18 +27,18 @@
 #include "qpid/linearstore/journal/utils/file_hdr.h"
 #include <unistd.h>
 
-//#include <iostream> // DEBUG
-
 namespace qpid {
 namespace linearstore {
 namespace journal {
 
 JournalFile::JournalFile(const std::string& fqFileName,
                          const efpIdentity_t& efpIdentity,
-                         const uint64_t fileSeqNum) :
+                         const uint64_t fileSeqNum,
+                         const std::string queueName) :
             efpIdentity_(efpIdentity),
             fqFileName_(fqFileName),
             fileSeqNum_(fileSeqNum),
+            queueName_(queueName),
             serial_(getRandom64()),
             firstRecordOffset_(0ULL),
             fileHandle_(-1),
@@ -47,6 +47,7 @@ JournalFile::JournalFile(const std::stri
             fileHeaderPtr_(0),
             aioControlBlockPtr_(0),
             fileSize_dblks_(((efpIdentity.ds_ * 1024) + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES)) / QLS_DBLK_SIZE_BYTES),
+            initializedFlag_(false),
             enqueuedRecordCount_("JournalFile::enqueuedRecordCount", 0),
             submittedDblkCount_("JournalFile::submittedDblkCount", 0),
             completedDblkCount_("JournalFile::completedDblkCount", 0),
@@ -54,10 +55,12 @@ JournalFile::JournalFile(const std::stri
 {}
 
 JournalFile::JournalFile(const std::string& fqFileName,
-                         const ::file_hdr_t& fileHeader) :
+                         const ::file_hdr_t& fileHeader,
+                         const std::string queueName) :
             efpIdentity_(fileHeader._efp_partition, fileHeader._data_size_kib),
             fqFileName_(fqFileName),
             fileSeqNum_(fileHeader._file_number),
+            queueName_(queueName),
             serial_(fileHeader._rhdr._serial),
             firstRecordOffset_(fileHeader._fro),
             fileHandle_(-1),
@@ -66,6 +69,7 @@ JournalFile::JournalFile(const std::stri
             fileHeaderPtr_(0),
             aioControlBlockPtr_(0),
             fileSize_dblks_(((fileHeader._data_size_kib * 1024) + (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_BYTES)) / QLS_DBLK_SIZE_BYTES),
+            initializedFlag_(false),
             enqueuedRecordCount_("JournalFile::enqueuedRecordCount", 0),
             submittedDblkCount_("JournalFile::submittedDblkCount", 0),
             completedDblkCount_("JournalFile::completedDblkCount", 0),
@@ -78,18 +82,21 @@ JournalFile::~JournalFile() {
 
 void
 JournalFile::initialize(const uint32_t completedDblkCount) {
-    if (::posix_memalign(&fileHeaderBasePtr_, QLS_AIO_ALIGN_BOUNDARY_BYTES, QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024))
-    {
-        std::ostringstream oss;
-        oss << "posix_memalign(): blksize=" << QLS_AIO_ALIGN_BOUNDARY_BYTES << " size=" << (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024);
-        oss << FORMAT_SYSERR(errno);
-        throw jexception(jerrno::JERR__MALLOC, oss.str(), "JournalFile", "initialize");
+    if (!initializedFlag_) {
+        if (::posix_memalign(&fileHeaderBasePtr_, QLS_AIO_ALIGN_BOUNDARY_BYTES, QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024))
+        {
+            std::ostringstream oss;
+            oss << "posix_memalign(): blksize=" << QLS_AIO_ALIGN_BOUNDARY_BYTES << " size=" << (QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024);
+            oss << FORMAT_SYSERR(errno);
+            throw jexception(jerrno::JERR__MALLOC, oss.str(), "JournalFile", "initialize");
+        }
+        fileHeaderPtr_ = (::file_hdr_t*)fileHeaderBasePtr_;
+        aioControlBlockPtr_ = new aio_cb;
+        initializedFlag_ = true;
     }
-    fileHeaderPtr_ = (::file_hdr_t*)fileHeaderBasePtr_;
-    aioControlBlockPtr_ = new aio_cb;
     if (completedDblkCount > 0UL) {
-        submittedDblkCount_.add(completedDblkCount);
-        completedDblkCount_.add(completedDblkCount);
+        submittedDblkCount_.set(completedDblkCount);
+        completedDblkCount_.set(completedDblkCount);
     }
 }
 
@@ -149,8 +156,7 @@ void JournalFile::asyncFileHeaderWrite(i
                                        const efpDataSize_kib_t efpDataSize_kib,
                                        const uint16_t userFlags,
                                        const uint64_t recordId,
-                                       const uint64_t firstRecordOffset,
-                                       const std::string queueName) {
+                                       const uint64_t firstRecordOffset) {
     firstRecordOffset_ = firstRecordOffset;
     ::file_hdr_create(fileHeaderPtr_, QLS_FILE_MAGIC, QLS_JRNL_VERSION, QLS_JRNL_FHDR_RES_SIZE_SBLKS, efpPartitionNumber, efpDataSize_kib);
     ::file_hdr_init(fileHeaderBasePtr_,
@@ -160,15 +166,15 @@ void JournalFile::asyncFileHeaderWrite(i
                     recordId,
                     firstRecordOffset,
                     fileSeqNum_,
-                    queueName.size(),
-                    queueName.data());
-    aio::prep_pwrite(aioControlBlockPtr_,
-                     fileHandle_,
-                     (void*)fileHeaderBasePtr_,
-                     QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024,
-                     0UL);
-    if (aio::submit(ioContextPtr, 1, &aioControlBlockPtr_) < 0)
-        throw jexception(jerrno::JERR__AIO, "JournalFile", "asyncPageWrite");
+                    queueName_.size(),
+                    queueName_.data());
+    const std::size_t wr_size = QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_KIB * 1024;
+    aio::prep_pwrite(aioControlBlockPtr_, fileHandle_, (void*)fileHeaderBasePtr_, wr_size, 0UL);
+    if (aio::submit(ioContextPtr, 1, &aioControlBlockPtr_) < 0) {
+        std::ostringstream oss;
+        oss << "queue=\"" << queueName_ << "\" fid=0x" << std::hex <<  fileSeqNum_ << " wr_size=0x" << wr_size << " foffs=0x0";
+        throw jexception(jerrno::JERR__AIO, oss.str(), "JournalFile", "asyncFileHeaderWrite");
+    }
     addSubmittedDblkCount(QLS_JRNL_FHDR_RES_SIZE_SBLKS * QLS_SBLK_SIZE_DBLKS);
     incrOutstandingAioOperationCount();
 }
@@ -177,16 +183,16 @@ void JournalFile::asyncPageWrite(io_cont
                                  aio_cb* aioControlBlockPtr,
                                  void* data,
                                  uint32_t dataSize_dblks) {
-    aio::prep_pwrite_2(aioControlBlockPtr,
-                       fileHandle_,
-                       data,
-                       dataSize_dblks * QLS_DBLK_SIZE_BYTES,
-                       submittedDblkCount_.get() * QLS_DBLK_SIZE_BYTES);
+    const std::size_t wr_size = dataSize_dblks * QLS_DBLK_SIZE_BYTES;
+    const uint64_t foffs = submittedDblkCount_.get() * QLS_DBLK_SIZE_BYTES;
+    aio::prep_pwrite_2(aioControlBlockPtr, fileHandle_, data, wr_size, foffs);
     pmgr::page_cb* pcbp = (pmgr::page_cb*)(aioControlBlockPtr->data); // This page's control block (pcb)
     pcbp->_wdblks = dataSize_dblks;
     pcbp->_jfp = this;
     if (aio::submit(ioContextPtr, 1, &aioControlBlockPtr) < 0) {
-        throw jexception(jerrno::JERR__AIO, "JournalFile", "asyncPageWrite"); // TODO: complete exception details
+        std::ostringstream oss;
+        oss << "queue=\"" << queueName_ << "\" fid=0x" << std::hex <<  fileSeqNum_ << " wr_size=0x" << wr_size << " foffs=0x" << foffs;
+        throw jexception(jerrno::JERR__AIO, oss.str(), "JournalFile", "asyncPageWrite");
     }
     addSubmittedDblkCount(dataSize_dblks);
     incrOutstandingAioOperationCount();

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/JournalFile.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/JournalFile.h?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/JournalFile.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/JournalFile.h Wed Feb 12 13:27:51 2014
@@ -38,6 +38,7 @@ protected:
     const efpIdentity_t efpIdentity_;
     const std::string fqFileName_;
     const uint64_t fileSeqNum_;
+    const std::string queueName_;
     const uint64_t serial_;
     uint64_t firstRecordOffset_;
     int fileHandle_;
@@ -46,6 +47,7 @@ protected:
     ::file_hdr_t* fileHeaderPtr_;
     aio_cb* aioControlBlockPtr_;
     uint32_t fileSize_dblks_;                           ///< File size in data blocks, including file header
+    bool initializedFlag_;
 
     AtomicCounter<uint32_t> enqueuedRecordCount_;       ///< Count of enqueued records
     AtomicCounter<uint32_t> submittedDblkCount_;        ///< Write file count (data blocks) for submitted AIO
@@ -56,10 +58,12 @@ public:
     // Constructor for creating new file with known fileSeqNum and random serial
     JournalFile(const std::string& fqFileName,
                 const efpIdentity_t& efpIdentity,
-                const uint64_t fileSeqNum);
+                const uint64_t fileSeqNum,
+                const std::string queueName);
     // Constructor for recovery in which fileSeqNum and serial are recovered from fileHeader param
     JournalFile(const std::string& fqFileName,
-                const ::file_hdr_t& fileHeader);
+                const ::file_hdr_t& fileHeader,
+                const std::string queueName);
     virtual ~JournalFile();
 
     void initialize(const uint32_t completedDblkCount);
@@ -76,13 +80,13 @@ public:
                               const efpDataSize_kib_t efpDataSize_kib,
                               const uint16_t userFlags,
                               const uint64_t recordId,
-                              const uint64_t firstRecordOffset,
-                              const std::string queueName);
+                              const uint64_t firstRecordOffset);
     void asyncPageWrite(io_context_t ioContextPtr,
                         aio_cb* aioControlBlockPtr,
                         void* data,
                         uint32_t dataSize_dblks);
 
+    uint32_t getSubmittedDblkCount() const;
     uint32_t getEnqueuedRecordCount() const;
     uint32_t incrEnqueuedRecordCount();
     uint32_t decrEnqueuedRecordCount();
@@ -109,7 +113,6 @@ protected:
     static uint64_t getRandom64();
     bool isOpen() const;
 
-    uint32_t getSubmittedDblkCount() const;
     uint32_t addSubmittedDblkCount(const uint32_t a);
 
     uint32_t getCompletedDblkCount() const;

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp Wed Feb 12 13:27:51 2014
@@ -25,8 +25,6 @@
 #include "qpid/linearstore/journal/jcntl.h"
 #include "qpid/linearstore/journal/JournalFile.h"
 
-//#include <iostream> // DEBUG
-
 namespace qpid {
 namespace linearstore {
 namespace journal {
@@ -34,10 +32,10 @@ namespace journal {
 LinearFileController::LinearFileController(jcntl& jcntlRef) :
             jcntlRef_(jcntlRef),
             emptyFilePoolPtr_(0),
-            currentJournalFilePtr_(0),
             fileSeqCounter_("LinearFileController::fileSeqCounter", 0),
             recordIdCounter_("LinearFileController::recordIdCounter", 0),
-            decrCounter_("LinearFileController::decrCounter", 0)
+            decrCounter_("LinearFileController::decrCounter", 0),
+            currentJournalFilePtr_(0)
 {}
 
 LinearFileController::~LinearFileController() {}
@@ -53,7 +51,7 @@ void LinearFileController::initialize(co
 void LinearFileController::finalize() {
     if (currentJournalFilePtr_) {
         currentJournalFilePtr_->close();
-        currentJournalFilePtr_ = NULL;
+        currentJournalFilePtr_ = 0;
     }
     while (!journalFileList_.empty()) {
         delete journalFileList_.front();
@@ -62,17 +60,21 @@ void LinearFileController::finalize() {
 }
 
 void LinearFileController::addJournalFile(JournalFile* journalFilePtr,
-                                          const uint32_t completedDblkCount) {
-    if (currentJournalFilePtr_) {
+                                          const uint32_t completedDblkCount,
+                                          const bool makeCurrentFlag) {
+    if (makeCurrentFlag && currentJournalFilePtr_) {
         currentJournalFilePtr_->close();
+        currentJournalFilePtr_ = 0;
     }
     journalFilePtr->initialize(completedDblkCount);
-    currentJournalFilePtr_ = journalFilePtr;
     {
         slock l(journalFileListMutex_);
-        journalFileList_.push_back(currentJournalFilePtr_);
+        journalFileList_.push_back(journalFilePtr);
+    }
+    if (makeCurrentFlag) {
+        currentJournalFilePtr_ = journalFilePtr;
+        currentJournalFilePtr_->open();
     }
-    currentJournalFilePtr_->open();
 }
 
 efpDataSize_sblks_t LinearFileController::dataSize_sblks() const {
@@ -83,16 +85,20 @@ efpFileSize_sblks_t LinearFileController
     return emptyFilePoolPtr_->fileSize_sblks();
 }
 
+void LinearFileController::getNextJournalFile() {
+    if (currentJournalFilePtr_)
+        currentJournalFilePtr_->close();
+    pullEmptyFileFromEfp();
+}
+
 uint64_t LinearFileController::getNextRecordId() {
     return recordIdCounter_.increment();
 }
 
-void LinearFileController::pullEmptyFileFromEfp() {
-    if (currentJournalFilePtr_)
-        currentJournalFilePtr_->close();
-    std::string ef = emptyFilePoolPtr_->takeEmptyFile(journalDirectory_); // Moves file from EFP only, returns new file name
-//std::cout << "*** LinearFileController::pullEmptyFileFromEfp() qn=" << jcntlRef.id() << " ef=" << ef << std::endl; // DEBUG
-    addJournalFile(ef, emptyFilePoolPtr_->getIdentity(), getNextFileSeqNum(), 0);
+void LinearFileController::removeFileToEfp(const std::string& fileName) {
+    if (emptyFilePoolPtr_) {
+        emptyFilePoolPtr_->returnEmptyFile(fileName);
+    }
 }
 
 void LinearFileController::restoreEmptyFile(const std::string& fileName) {
@@ -101,7 +107,11 @@ void LinearFileController::restoreEmptyF
 
 void LinearFileController::purgeEmptyFilesToEfp() {
     slock l(journalFileListMutex_);
-    purgeEmptyFilesToEfpNoLock();
+    while (journalFileList_.front()->isNoEnqueuedRecordsRemaining() && journalFileList_.size() > 1) { // Can't purge last file, even if it has no enqueued records
+        emptyFilePoolPtr_->returnEmptyFile(journalFileList_.front()->getFqFileName());
+        delete journalFileList_.front();
+        journalFileList_.pop_front();
+    }
 }
 
 uint32_t LinearFileController::getEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) {
@@ -113,7 +123,6 @@ uint32_t LinearFileController::incrEnque
 }
 
 uint32_t LinearFileController::decrEnqueuedRecordCount(const efpFileCount_t fileSeqNumber) {
-    slock l(journalFileListMutex_);
     uint32_t r = find(fileSeqNumber)->decrEnqueuedRecordCount();
 
     // TODO: Re-evaluate after testing and profiling
@@ -122,18 +131,16 @@ uint32_t LinearFileController::decrEnque
     // records). We need to check this rather simple scheme works for outlying scenarios (large and tiny data
     // records) without impacting performance or performing badly (leaving excessive empty files in the journals).
     if (decrCounter_.increment() % 100ULL == 0ULL) {
-        purgeEmptyFilesToEfpNoLock();
+        purgeEmptyFilesToEfp();
     }
     return r;
 }
 
 uint32_t LinearFileController::addWriteCompletedDblkCount(const efpFileCount_t fileSeqNumber, const uint32_t a) {
-    slock l(journalFileListMutex_);
     return find(fileSeqNumber)->addCompletedDblkCount(a);
 }
 
 uint16_t LinearFileController::decrOutstandingAioOperationCount(const efpFileCount_t fileSeqNumber) {
-    slock l(journalFileListMutex_);
     return find(fileSeqNumber)->decrOutstandingAioOperationCount();
 }
 
@@ -142,12 +149,11 @@ void LinearFileController::asyncFileHead
                                                 const uint64_t recordId,
                                                 const uint64_t firstRecordOffset) {
     currentJournalFilePtr_->asyncFileHeaderWrite(ioContextPtr,
-                                                 emptyFilePoolPtr_->getPartitionNumber(),
-                                                 emptyFilePoolPtr_->dataSize_kib(),
-                                                 userFlags,
-                                                 recordId,
-                                                 firstRecordOffset,
-                                                 jcntlRef_.id());
+                                              emptyFilePoolPtr_->getPartitionNumber(),
+                                              emptyFilePoolPtr_->dataSize_kib(),
+                                              userFlags,
+                                              recordId,
+                                              firstRecordOffset);
 }
 
 void LinearFileController::asyncPageWrite(io_context_t ioContextPtr,
@@ -195,8 +201,8 @@ void LinearFileController::addJournalFil
                                           const efpIdentity_t& efpIdentity,
                                           const uint64_t fileNumber,
                                           const uint32_t completedDblkCount) {
-    JournalFile* jfp = new JournalFile(fileName, efpIdentity, fileNumber);
-    addJournalFile(jfp, completedDblkCount);
+    JournalFile* jfp = new JournalFile(fileName, efpIdentity, fileNumber, jcntlRef_.id());
+    addJournalFile(jfp, completedDblkCount, true);
 }
 
 void LinearFileController::assertCurrentJournalFileValid(const char* const functionName) const {
@@ -209,15 +215,17 @@ bool LinearFileController::checkCurrentJ
     return currentJournalFilePtr_ != 0;
 }
 
-// NOTE: NOT THREAD SAFE - journalFileList is accessed by multiple threads - use under external lock
 JournalFile* LinearFileController::find(const efpFileCount_t fileSeqNumber) {
-    if (currentJournalFilePtr_ != 0 && currentJournalFilePtr_->getFileSeqNum() == fileSeqNumber)
+    if (currentJournalFilePtr_ && currentJournalFilePtr_->getFileSeqNum() == fileSeqNumber)
         return currentJournalFilePtr_;
+
+    slock l(journalFileListMutex_);
     for (JournalFileListItr_t i=journalFileList_.begin(); i!=journalFileList_.end(); ++i) {
         if ((*i)->getFileSeqNum() == fileSeqNumber) {
             return *i;
         }
     }
+
     std::ostringstream oss;
     oss << "fileSeqNumber=" << fileSeqNumber;
     throw jexception(jerrno::JERR_LFCR_SEQNUMNOTFOUND, oss.str(), "LinearFileController", "find");
@@ -227,15 +235,9 @@ uint64_t LinearFileController::getNextFi
     return fileSeqCounter_.increment();
 }
 
-void LinearFileController::purgeEmptyFilesToEfpNoLock() {
-//std::cout << " >P n=" << journalFileList_.size() << " e=" << (journalFileList_.front()->isNoEnqueuedRecordsRemaining()?"T":"F") << std::flush; // DEBUG
-    while (journalFileList_.front()->isNoEnqueuedRecordsRemaining() &&
-           journalFileList_.size() > 1) { // Can't purge last file, even if it has no enqueued records
-//std::cout << " *f=" << journalFileList_.front()->getFqFileName() << std::flush; // DEBUG
-        emptyFilePoolPtr_->returnEmptyFile(journalFileList_.front()->getFqFileName());
-        delete journalFileList_.front();
-        journalFileList_.pop_front();
-    }
+void LinearFileController::pullEmptyFileFromEfp() {
+    std::string efn = emptyFilePoolPtr_->takeEmptyFile(journalDirectory_); // Moves file from EFP only (ie no file init), returns new file name
+    addJournalFile(efn, emptyFilePoolPtr_->getIdentity(), getNextFileSeqNum(), 0);
 }
 
 }}}

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.h?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.h Wed Feb 12 13:27:51 2014
@@ -44,12 +44,12 @@ protected:
     jcntl& jcntlRef_;
     std::string journalDirectory_;
     EmptyFilePool* emptyFilePoolPtr_;
-    JournalFile* currentJournalFilePtr_;
     AtomicCounter<uint64_t> fileSeqCounter_;
     AtomicCounter<uint64_t> recordIdCounter_;
     AtomicCounter<uint64_t> decrCounter_;
 
     JournalFileList_t journalFileList_;
+    JournalFile* currentJournalFilePtr_;
     smutex journalFileListMutex_;
 
 public:
@@ -62,12 +62,14 @@ public:
     void finalize();
 
     void addJournalFile(JournalFile* journalFilePtr,
-                        const uint32_t completedDblkCount);
+                        const uint32_t completedDblkCount,
+                        const bool makeCurrentFlag);
 
     efpDataSize_sblks_t dataSize_sblks() const;
     efpFileSize_sblks_t fileSize_sblks() const;
+    void getNextJournalFile();
     uint64_t getNextRecordId();
-    void pullEmptyFileFromEfp();
+    void removeFileToEfp(const std::string& fileName);
     void restoreEmptyFile(const std::string& fileName);
     void purgeEmptyFilesToEfp();
 
@@ -105,11 +107,12 @@ protected:
     bool checkCurrentJournalFileValid() const;
     JournalFile* find(const efpFileCount_t fileSeqNumber);
     uint64_t getNextFileSeqNum();
-    void purgeEmptyFilesToEfpNoLock();
+    void pullEmptyFileFromEfp();
 };
 
 typedef void (LinearFileController::*lfcAddJournalFileFn)(JournalFile* journalFilePtr,
-                                                          const uint32_t completedDblkCount);
+                                                          const uint32_t completedDblkCount,
+                                                          const bool makeCurrentFlag);
 
 }}}
 

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp Wed Feb 12 13:27:51 2014
@@ -56,11 +56,15 @@ RecoveredRecordData_t::RecoveredRecordDa
                     pendingTransaction_(ptxn)
 {}
 
-
 bool recordIdListCompare(RecoveredRecordData_t a, RecoveredRecordData_t b) {
     return a.recordId_ < b.recordId_;
 }
 
+RecoveredFileData_t::RecoveredFileData_t(JournalFile* journalFilePtr, const uint32_t completedDblkCount) :
+                    journalFilePtr_(journalFilePtr),
+                    completedDblkCount_(completedDblkCount)
+{}
+
 RecoveryManager::RecoveryManager(const std::string& journalDirectory,
                                  const std::string& queuename,
                                  enq_map& enqueueMapRef,
@@ -77,11 +81,17 @@ RecoveryManager::RecoveryManager(const s
                                                  highestRecordId_(0ULL),
                                                  highestFileNumber_(0ULL),
                                                  lastFileFullFlag_(false),
+                                                 initial_fid_(0),
                                                  currentSerial_(0),
                                                  efpFileSize_kib_(0)
 {}
 
-RecoveryManager::~RecoveryManager() {}
+RecoveryManager::~RecoveryManager() {
+    for (fileNumberMapItr_t i = fileNumberMap_.begin(); i != fileNumberMap_.end(); ++i) {
+        delete i->second;
+    }
+    fileNumberMap_.clear();
+}
 
 void RecoveryManager::analyzeJournals(const std::vector<std::string>* preparedTransactionListPtr,
                                       EmptyFilePoolManager* emptyFilePoolManager,
@@ -92,9 +102,6 @@ void RecoveryManager::analyzeJournals(co
     *emptyFilePoolPtrPtr = emptyFilePoolManager->getEmptyFilePool(efpIdentity);
     efpFileSize_kib_ = (*emptyFilePoolPtrPtr)->fileSize_kib();
 
-    // Check for file full condition
-    lastFileFullFlag_ = endOffset_ == (std::streamoff)(*emptyFilePoolPtrPtr)->fileSize_kib() * 1024;
-
     if (!journalEmptyFlag_) {
 
         // Read all records, establish remaining enqueued records
@@ -106,6 +113,9 @@ void RecoveryManager::analyzeJournals(co
             inFileStream_.close();
         }
 
+        // Check for file full condition
+        lastFileFullFlag_ = endOffset_ == (std::streamoff)(*emptyFilePoolPtrPtr)->fileSize_kib() * 1024;
+
         // Remove leading files which have no enqueued records
         removeEmptyFiles(*emptyFilePoolPtrPtr);
 
@@ -121,7 +131,7 @@ void RecoveryManager::analyzeJournals(co
                     // Unlock any affected enqueues in emap
                     for (tdl_itr_t i=tdl.begin(); i<tdl.end(); i++) {
                         if (i->enq_flag_) { // enq op - decrement enqueue count
-                            fileNumberMap_[i->pfid_]->decrEnqueuedRecordCount();
+                            fileNumberMap_[i->pfid_]->journalFilePtr_->decrEnqueuedRecordCount();
                         } else if (enqueueMapRef_.is_enqueued(i->drid_, true)) { // deq op - unlock enq record
                             if (enqueueMapRef_.unlock(i->drid_) < enq_map::EMAP_OK) { // fail
                                 // enq_map::unlock()'s only error is enq_map::EMAP_RID_NOT_FOUND
@@ -174,7 +184,7 @@ bool RecoveryManager::readNextRemainingR
         }
     } while (!foundRecord);
 
-    if (!inFileStream_.is_open() || currentJournalFileConstItr_->first != recordIdListConstItr_->fileId_) {
+    if (!inFileStream_.is_open() || currentJournalFileItr_->first != recordIdListConstItr_->fileId_) {
         if (!getFile(recordIdListConstItr_->fileId_, false)) {
             std::ostringstream oss;
             oss << "Failed to open file with file-id=" << recordIdListConstItr_->fileId_;
@@ -231,7 +241,6 @@ bool RecoveryManager::readNextRemainingR
     ::rec_tail_t enqueueTail;
     inFileStream_.read((char*)&enqueueTail, sizeof(::rec_tail_t));
     uint32_t cs = checksum.getChecksum();
-//std::cout << std::hex << "### rid=0x" << enqueueHeader._rhdr._rid << " rtcs=0x" << enqueueTail._checksum << " cs=0x" << cs << std::dec << std::endl; // DEBUG
     uint16_t res = ::rec_tail_check(&enqueueTail, &enqueueHeader._rhdr, cs);
     if (res != 0) {
         std::stringstream oss;
@@ -266,17 +275,30 @@ bool RecoveryManager::readNextRemainingR
 void RecoveryManager::setLinearFileControllerJournals(lfcAddJournalFileFn fnPtr,
                                                       LinearFileController* lfcPtr) {
     if (journalEmptyFlag_) {
-        if (uninitializedJournal_.size() > 0) {
-            lfcPtr->restoreEmptyFile(uninitializedJournal_);
+        if (uninitFileList_.size() > 0) {
+            std::string uninitFile = uninitFileList_.back();
+            uninitFileList_.pop_back();
+            lfcPtr->restoreEmptyFile(uninitFile);
         }
     } else {
         for (fileNumberMapConstItr_t i = fileNumberMap_.begin(); i != fileNumberMap_.end(); ++i) {
-            uint32_t fileDblkCount = i->first == highestFileNumber_ ?               // Is this this last file?
-                                     endOffset_ / QLS_DBLK_SIZE_BYTES :             // Last file uses _endOffset
-                                     efpFileSize_kib_ * 1024 / QLS_DBLK_SIZE_BYTES; // All others use file size to make them full
-            (lfcPtr->*fnPtr)(i->second, fileDblkCount);
+            (lfcPtr->*fnPtr)(i->second->journalFilePtr_, i->second->completedDblkCount_, i->first == initial_fid_);
         }
     }
+
+    std::ostringstream oss;
+    bool logFlag = !notNeededFilesList_.empty();
+    if (logFlag) {
+        oss << "Files removed from head of journal: prior truncation during recovery:";
+    }
+    while (!notNeededFilesList_.empty()) {
+        lfcPtr->removeFileToEfp(notNeededFilesList_.back());
+        oss << std::endl << " * " << notNeededFilesList_.back();
+        notNeededFilesList_.pop_back();
+    }
+    if (logFlag) {
+        journalLogRef_.log(JournalLog::LOG_NOTICE, queueName_, oss.str());
+    }
 }
 
 std::string RecoveryManager::toString(const std::string& jid) {
@@ -285,7 +307,7 @@ std::string RecoveryManager::toString(co
     oss << "  Number of journal files = " << fileNumberMap_.size() << std::endl;
     oss << "  Journal File List:" << std::endl;
     for (fileNumberMapConstItr_t k=fileNumberMap_.begin(); k!=fileNumberMap_.end(); ++k) {
-        std::string fqFileName = k->second->getFqFileName();
+        std::string fqFileName = k->second->journalFilePtr_->getFqFileName();
         oss << "    " << k->first << ": " << fqFileName.substr(fqFileName.rfind('/')+1) << std::endl;
     }
     oss << "  Enqueue Counts: [ ";
@@ -293,7 +315,7 @@ std::string RecoveryManager::toString(co
         if (l != fileNumberMap_.begin()) {
             oss << ", ";
         }
-        oss << l->second->getEnqueuedRecordCount();
+        oss << l->second->journalFilePtr_->getEnqueuedRecordCount();
     }
     oss << " ]" << std::endl;
     oss << "  Journal empty = " << (journalEmptyFlag_ ? "TRUE" : "FALSE") << std::endl;
@@ -330,15 +352,17 @@ std::string RecoveryManager::toLog(const
                          << std::setw(10) << "--------"
                          << std::endl;
         for (fileNumberMapConstItr_t k=fileNumberMap_.begin(); k!=fileNumberMap_.end(); ++k) {
-            std::string fqFileName = k->second->getFqFileName();
+            std::string fqFileName = k->second->journalFilePtr_->getFqFileName();
+            std::ostringstream fid;
+            fid << std::hex << "0x" << k->first;
             std::ostringstream fro;
-            fro << std::hex << "0x" << k->second->getFirstRecordOffset();
-            oss << indentStr << std::setw(7) << k->first
+            fro << std::hex << "0x" << k->second->journalFilePtr_->getFirstRecordOffset();
+            oss << indentStr << std::setw(7) << fid.str()
                              << std::setw(43) << fqFileName.substr(fqFileName.rfind('/')+1)
                              << std::setw(16) << fro.str()
-                             << std::setw(12) << k->second->getEnqueuedRecordCount()
-                             << std::setw(5) << k->second->getEfpIdentity().pn_
-                             << std::setw(9) << k->second->getEfpIdentity().ds_ << "k"
+                             << std::setw(12) << k->second->journalFilePtr_->getEnqueuedRecordCount()
+                             << std::setw(5) << k->second->journalFilePtr_->getEfpIdentity().pn_
+                             << std::setw(9) << k->second->journalFilePtr_->getEfpIdentity().ds_ << "k"
                              << std::endl;
         }
         oss << indentStr << "First record offset in first file = 0x" << std::hex << firstRecordOffset_ <<
@@ -347,7 +371,7 @@ std::string RecoveryManager::toLog(const
                 (endOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
         oss << indentStr << "Highest rid found = 0x" << std::hex << highestRecordId_ << std::dec << std::endl;
         oss << indentStr << "Last file full = " << (lastFileFullFlag_ ? "TRUE" : "FALSE") << std::endl;
-        oss << indentStr << "Enqueued records (txn & non-txn):";
+        //oss << indentStr << "Enqueued records (txn & non-txn):"; // TODO: complete report
     }
     return oss.str();
 }
@@ -357,27 +381,28 @@ std::string RecoveryManager::toLog(const
 void RecoveryManager::analyzeJournalFileHeaders(efpIdentity_t& efpIdentity) {
     std::string headerQueueName;
     ::file_hdr_t fileHeader;
-    directoryList_t directoryList;
+    stringList_t directoryList;
     jdir::read_dir(journalDirectory_, directoryList, false, true, false, true);
-    for (directoryListConstItr_t i = directoryList.begin(); i != directoryList.end(); ++i) {
+    for (stringListConstItr_t i = directoryList.begin(); i != directoryList.end(); ++i) {
         readJournalFileHeader(*i, fileHeader, headerQueueName);
         if (headerQueueName.empty()) {
             std::ostringstream oss;
-            if (uninitializedJournal_.empty()) {
-                oss << "Journal file " << (*i) << " is first uninitialized (not yet written) journal file.";
-                journalLogRef_.log(JournalLog::LOG_NOTICE, queueName_, oss.str());
-                uninitializedJournal_ = *i;
-            } else {
-                oss << "Journal file " << (*i) << " is second or greater uninitialized journal file - ignoring";
-                journalLogRef_.log(JournalLog::LOG_WARN, queueName_, oss.str());
-            }
+            oss << "Journal file " << (*i) << " is uninitialized";
+            journalLogRef_.log(JournalLog::LOG_WARN, queueName_, oss.str());
+            uninitFileList_.push_back(*i);
         } else if (headerQueueName.compare(queueName_) != 0) {
             std::ostringstream oss;
             oss << "Journal file " << (*i) << " belongs to queue \"" << headerQueueName << "\": ignoring";
             journalLogRef_.log(JournalLog::LOG_WARN, queueName_, oss.str());
         } else {
-            JournalFile* jfp = new JournalFile(*i, fileHeader);
-            fileNumberMap_[fileHeader._file_number] = jfp;
+            JournalFile* jfp = new JournalFile(*i, fileHeader, queueName_);
+            std::pair<fileNumberMapItr_t, bool> res = fileNumberMap_.insert(
+                            std::pair<uint64_t, RecoveredFileData_t*>(fileHeader._file_number, new RecoveredFileData_t(jfp, 0)));
+            if (!res.second) {
+                std::ostringstream oss;
+                oss << "Journal file " << (*i) << " has fid=0x" << std::hex << jfp->getFileSeqNum() << " which already exists for this journal.";
+                throw jexception(oss.str()); // TODO: complete this exception
+            }
             if (fileHeader._file_number > highestFileNumber_) {
                 highestFileNumber_ = fileHeader._file_number;
             }
@@ -393,7 +418,7 @@ void RecoveryManager::analyzeJournalFile
     if (fileNumberMap_.empty()) {
         journalEmptyFlag_ = true;
     } else {
-        currentJournalFileConstItr_ = fileNumberMap_.begin();
+        currentJournalFileItr_ = fileNumberMap_.begin();
     }
 }
 
@@ -408,7 +433,7 @@ void RecoveryManager::checkFileStreamOk(
     }
 }
 
-void RecoveryManager::checkJournalAlignment(const std::streampos recordPosition) {
+void RecoveryManager::checkJournalAlignment(const uint64_t start_fid, const std::streampos recordPosition) {
     if (recordPosition % QLS_DBLK_SIZE_BYTES != 0) {
         std::ostringstream oss;
         oss << "Current read pointer not dblk aligned: recordPosition=0x" << std::hex << recordPosition;
@@ -420,12 +445,13 @@ void RecoveryManager::checkJournalAlignm
     if (sblkOffset)
     {
         std::ostringstream oss1;
-        oss1 << std::hex << "Bad record alignment found at fid=0x" << getCurrentFileNumber();
+        oss1 << std::hex << "Bad record alignment found at fid=0x" << start_fid;
         oss1 << " offs=0x" << currentPosn << " (likely journal overwrite boundary); " << std::dec;
         oss1 << (QLS_SBLK_SIZE_DBLKS - (sblkOffset/QLS_DBLK_SIZE_BYTES)) << " filler record(s) required.";
         journalLogRef_.log(JournalLog::LOG_WARN, queueName_, oss1.str());
 
-        std::ofstream outFileStream(getCurrentFileName().c_str(), std::ios_base::in | std::ios_base::out | std::ios_base::binary);
+        fileNumberMapConstItr_t fnmItr = fileNumberMap_.find(start_fid);
+        std::ofstream outFileStream(fnmItr->second->journalFilePtr_->getFqFileName().c_str(), std::ios_base::in | std::ios_base::out | std::ios_base::binary);
         if (!outFileStream.good()) {
             throw jexception(jerrno::JERR__FILEIO, getCurrentFileName(), "RecoveryManager", "checkJournalAlignment");
         }
@@ -447,7 +473,7 @@ void RecoveryManager::checkJournalAlignm
                 throw jexception(jerrno::JERR_RCVM_WRITE, "RecoveryManager", "checkJournalAlignment");
             }
             std::ostringstream oss2;
-            oss2 << std::hex << "Recover phase write: Wrote filler record: fid=0x" << getCurrentFileNumber();
+            oss2 << std::hex << "Recover phase write: Wrote filler record: fid=0x" << start_fid;
             oss2 << " offs=0x" << currentPosn;
             journalLogRef_.log(JournalLog::LOG_NOTICE, queueName_, oss2.str());
             currentPosn = outFileStream.tellp();
@@ -456,16 +482,15 @@ void RecoveryManager::checkJournalAlignm
         std::free(writeBuffer);
         journalLogRef_.log(JournalLog::LOG_INFO, queueName_, "Bad record alignment fixed.");
     }
-    endOffset_ = currentPosn;
+    lastRecord(start_fid, currentPosn);
 }
 
 bool RecoveryManager::decodeRecord(jrec& record,
                                    std::size_t& cumulativeSizeRead,
                                    ::rec_hdr_t& headerRecord,
-                                   std::streampos& fileOffset)
+                                   const uint64_t start_fid,
+                                   const std::streampos recordOffset)
 {
-    std::streampos start_file_offs = fileOffset;
-
     if (highestRecordId_ == 0) {
         highestRecordId_ = headerRecord._rid;
     } else if (headerRecord._rid - highestRecordId_ < 0x8000000000000000ULL) { // RFC 1982 comparison for unsigned 64-bit
@@ -475,7 +500,7 @@ bool RecoveryManager::decodeRecord(jrec&
     bool done = false;
     while (!done) {
         try {
-            done = record.decode(headerRecord, &inFileStream_, cumulativeSizeRead);
+            done = record.decode(headerRecord, &inFileStream_, cumulativeSizeRead, recordOffset);
         }
         catch (const jexception& e) {
             if (e.err_code() == jerrno::JERR_JREC_BADRECTAIL) {
@@ -485,11 +510,12 @@ bool RecoveryManager::decodeRecord(jrec&
             } else {
                 journalLogRef_.log(JournalLog::LOG_INFO, queueName_, e.what());
             }
-            checkJournalAlignment(start_file_offs);
+            checkJournalAlignment(start_fid, recordOffset);
             return false;
         }
         if (!done && needNextFile()) {
             if (!getNextFile(false)) {
+                checkJournalAlignment(start_fid, recordOffset);
                 return false;
             }
         }
@@ -498,11 +524,11 @@ bool RecoveryManager::decodeRecord(jrec&
 }
 
 std::string RecoveryManager::getCurrentFileName() const {
-    return currentJournalFileConstItr_->second->getFqFileName();
+    return currentJournalFileItr_->second->journalFilePtr_->getFqFileName();
 }
 
 uint64_t RecoveryManager::getCurrentFileNumber() const {
-    return currentJournalFileConstItr_->first;
+    return currentJournalFileItr_->first;
 }
 
 bool RecoveryManager::getFile(const uint64_t fileNumber, bool jumpToFirstRecordOffsetFlag) {
@@ -511,8 +537,8 @@ bool RecoveryManager::getFile(const uint
 //std::cout << " f=" << getCurrentFileName() << "]" << std::flush; // DEBUG
         inFileStream_.clear(); // clear eof flag, req'd for older versions of c++
     }
-    currentJournalFileConstItr_ = fileNumberMap_.find(fileNumber);
-    if (currentJournalFileConstItr_ == fileNumberMap_.end()) {
+    currentJournalFileItr_ = fileNumberMap_.find(fileNumber);
+    if (currentJournalFileItr_ == fileNumberMap_.end()) {
         return false;
     }
     inFileStream_.open(getCurrentFileName().c_str(), std::ios_base::in | std::ios_base::binary);
@@ -536,7 +562,8 @@ bool RecoveryManager::getNextFile(bool j
     if (inFileStream_.is_open()) {
         inFileStream_.close();
 //std::cout << " .f=" << getCurrentFileName() << "]" << std::flush; // DEBUG
-        if (++currentJournalFileConstItr_ == fileNumberMap_.end()) {
+        currentJournalFileItr_->second->completedDblkCount_ = efpFileSize_kib_ * 1024 / QLS_DBLK_SIZE_BYTES;
+        if (++currentJournalFileItr_ == fileNumberMap_.end()) {
             return false;
         }
         inFileStream_.clear(); // clear eof flag, req'd for older versions of c++
@@ -562,13 +589,15 @@ bool RecoveryManager::getNextRecordHeade
     rec_hdr_t h;
 
     bool hdr_ok = false;
-    std::streampos file_pos;
+    uint64_t file_id = 0;
+    std::streampos file_pos = 0;
     while (!hdr_ok) {
         if (needNextFile()) {
             if (!getNextFile(true)) {
                 return false;
             }
         }
+        file_id = currentJournalFileItr_->second->journalFilePtr_->getFileSeqNum();
         file_pos = inFileStream_.tellg();
         if (file_pos == std::streampos(-1)) {
             std::ostringstream oss;
@@ -587,21 +616,21 @@ bool RecoveryManager::getNextRecordHeade
         }
     }
 
+    uint64_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary
     switch(h._magic) {
         case QLS_ENQ_MAGIC:
             {
 //std::cout << " 0x" << std::hex << file_pos << ".e.0x" << h._rid << std::dec << std::flush; // DEBUG
                 if (::rec_hdr_check(&h, QLS_ENQ_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) {
-                    endOffset_ = file_pos;
+                    lastRecord(file_id, file_pos);
                     return false;
                 }
                 enq_rec er;
-                uint64_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary
-                if (!decodeRecord(er, cum_size_read, h, file_pos)) {
+                if (!decodeRecord(er, cum_size_read, h, start_fid, file_pos)) {
                     return false;
                 }
                 if (!er.is_transient()) { // Ignore transient msgs
-                    fileNumberMap_[start_fid]->incrEnqueuedRecordCount();
+                    fileNumberMap_[start_fid]->journalFilePtr_->incrEnqueuedRecordCount();
                     if (er.xid_size()) {
                         er.get_xid(&xidp);
                         if (xidp == 0) {
@@ -629,12 +658,11 @@ bool RecoveryManager::getNextRecordHeade
             {
 //std::cout << " 0x" << std::hex << file_pos << ".d.0x" << h._rid << std::dec << std::flush; // DEBUG
                 if (::rec_hdr_check(&h, QLS_DEQ_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) {
-                    endOffset_ = file_pos;
+                    lastRecord(file_id, file_pos);
                     return false;
                 }
                 deq_rec dr;
-                uint16_t start_fid = getCurrentFileNumber(); // fid may increment in decode() if record folds over file boundary
-                if (!decodeRecord(dr, cum_size_read, h, file_pos)) {
+                if (!decodeRecord(dr, cum_size_read, h, start_fid, file_pos)) {
                     return false;
                 }
                 if (dr.xid_size()) {
@@ -655,7 +683,7 @@ bool RecoveryManager::getNextRecordHeade
                 } else {
                     uint64_t enq_fid;
                     if (enqueueMapRef_.get_remove_pfid(dr.deq_rid(), enq_fid, true) == enq_map::EMAP_OK) { // ignore not found error
-                        fileNumberMap_[enq_fid]->decrEnqueuedRecordCount();
+                        fileNumberMap_[enq_fid]->journalFilePtr_->decrEnqueuedRecordCount();
                     }
                 }
             }
@@ -664,11 +692,11 @@ bool RecoveryManager::getNextRecordHeade
             {
 //std::cout << " 0x" << std::hex << file_pos << ".a.0x" << h._rid << std::dec << std::flush; // DEBUG
                 if (::rec_hdr_check(&h, QLS_TXA_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) {
-                    endOffset_ = file_pos;
+                    lastRecord(file_id, file_pos);
                     return false;
                 }
                 txn_rec ar;
-                if (!decodeRecord(ar, cum_size_read, h, file_pos)) {
+                if (!decodeRecord(ar, cum_size_read, h, start_fid, file_pos)) {
                     return false;
                 }
                 // Delete this txn from tmap, unlock any locked records in emap
@@ -680,7 +708,7 @@ bool RecoveryManager::getNextRecordHeade
                 txn_data_list_t tdl = transactionMapRef_.get_remove_tdata_list(xid); // tdl will be empty if xid not found
                 for (tdl_itr_t itr = tdl.begin(); itr != tdl.end(); itr++) {
                     if (itr->enq_flag_) {
-                        fileNumberMap_[itr->pfid_]->decrEnqueuedRecordCount();
+                        fileNumberMap_[itr->pfid_]->journalFilePtr_->decrEnqueuedRecordCount();
                     } else {
                         enqueueMapRef_.unlock(itr->drid_); // ignore not found error
                     }
@@ -691,11 +719,11 @@ bool RecoveryManager::getNextRecordHeade
             {
 //std::cout << " 0x" << std::hex << file_pos << ".c.0x" << h._rid << std::dec << std::flush; // DEBUG
                 if (::rec_hdr_check(&h, QLS_TXC_MAGIC, QLS_JRNL_VERSION, currentSerial_) != 0) {
-                    endOffset_ = file_pos;
+                    lastRecord(file_id, file_pos);
                     return false;
                 }
                 txn_rec cr;
-                if (!decodeRecord(cr, cum_size_read, h, file_pos)) {
+                if (!decodeRecord(cr, cum_size_read, h, start_fid, file_pos)) {
                     return false;
                 }
                 // Delete this txn from tmap, process records into emap
@@ -717,7 +745,7 @@ bool RecoveryManager::getNextRecordHeade
                     } else { // txn dequeue
                         uint64_t enq_fid;
                         if (enqueueMapRef_.get_remove_pfid(itr->drid_, enq_fid, true) == enq_map::EMAP_OK) // ignore not found error
-                            fileNumberMap_[enq_fid]->decrEnqueuedRecordCount();
+                            fileNumberMap_[enq_fid]->journalFilePtr_->decrEnqueuedRecordCount();
                     }
                 }
             }
@@ -729,7 +757,9 @@ bool RecoveryManager::getNextRecordHeade
                 inFileStream_.ignore(rec_dblks * QLS_DBLK_SIZE_BYTES - sizeof(::rec_hdr_t));
                 checkFileStreamOk(false);
                 if (needNextFile()) {
+                    file_pos += rec_dblks * QLS_DBLK_SIZE_BYTES;
                     if (!getNextFile(false)) {
+                        lastRecord(start_fid, file_pos);
                         return false;
                     }
                 }
@@ -737,17 +767,36 @@ bool RecoveryManager::getNextRecordHeade
             break;
         case 0:
 //std::cout << " 0x" << std::hex << file_pos << ".0" << std::dec << std::endl << std::flush; // DEBUG
-            checkJournalAlignment(file_pos);
+            checkJournalAlignment(getCurrentFileNumber(), file_pos);
             return false;
         default:
 //std::cout << " 0x" << std::hex << file_pos << ".?" << std::dec << std::endl << std::flush; // DEBUG
             // Stop as this is the overwrite boundary.
-            checkJournalAlignment(file_pos);
+            checkJournalAlignment(getCurrentFileNumber(), file_pos);
             return false;
     }
     return true;
 }
 
+void RecoveryManager::lastRecord(const uint64_t file_id, const std::streamoff endOffset) {
+    endOffset_ = endOffset;
+    initial_fid_ = file_id;
+    fileNumberMap_[file_id]->completedDblkCount_ = endOffset_ / QLS_DBLK_SIZE_BYTES;
+
+    // Remove any files in fileNumberMap_ beyond initial_fid_
+    fileNumberMapItr_t unwantedFirstItr = fileNumberMap_.find(file_id);
+    if (++unwantedFirstItr != fileNumberMap_.end()) {
+        fileNumberMapItr_t itr = unwantedFirstItr;
+        notNeededFilesList_.push_back(unwantedFirstItr->second->journalFilePtr_->getFqFileName());
+        while (++itr != fileNumberMap_.end()) {
+            notNeededFilesList_.push_back(itr->second->journalFilePtr_->getFqFileName());
+            delete itr->second->journalFilePtr_;
+            delete itr->second;
+        }
+        fileNumberMap_.erase(unwantedFirstItr, fileNumberMap_.end());
+    }
+}
+
 bool RecoveryManager::needNextFile() {
     if (inFileStream_.is_open()) {
         return inFileStream_.eof() || inFileStream_.tellg() >= std::streampos(efpFileSize_kib_ * 1024);
@@ -820,7 +869,7 @@ bool RecoveryManager::readFileHeader() {
         currentSerial_ = fhdr._rhdr._serial;
     } else {
         inFileStream_.close();
-        if (currentJournalFileConstItr_ == fileNumberMap_.begin()) {
+        if (currentJournalFileItr_ == fileNumberMap_.begin()) {
             journalEmptyFlag_ = true;
         }
         return false;
@@ -855,9 +904,11 @@ void RecoveryManager::readJournalFileHea
 }
 
 void RecoveryManager::removeEmptyFiles(EmptyFilePool* emptyFilePoolPtr) {
-    while (fileNumberMap_.begin()->second->getEnqueuedRecordCount() == 0 && fileNumberMap_.size() > 1) {
-//std::cout << "*** File " << i->first << ": " << i->second << " is empty." << std::endl; // DEBUG
-        emptyFilePoolPtr->returnEmptyFile(fileNumberMap_.begin()->second->getFqFileName());
+    while (fileNumberMap_.begin()->second->journalFilePtr_->getEnqueuedRecordCount() == 0 && fileNumberMap_.size() > 1) {
+        RecoveredFileData_t* rfdp = fileNumberMap_.begin()->second;
+        emptyFilePoolPtr->returnEmptyFile(rfdp->journalFilePtr_->getFqFileName());
+        delete rfdp->journalFilePtr_;
+        delete rfdp;
         fileNumberMap_.erase(fileNumberMap_.begin()->first);
     }
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h Wed Feb 12 13:27:51 2014
@@ -51,15 +51,21 @@ struct RecoveredRecordData_t {
     RecoveredRecordData_t(const uint64_t rid, const uint64_t fid, const std::streampos foffs, bool ptxn);
 };
 
+struct RecoveredFileData_t {
+    JournalFile* journalFilePtr_;
+    uint32_t completedDblkCount_;
+    RecoveredFileData_t(JournalFile* journalFilePtr, const uint32_t completedDblkCount);
+};
+
 bool recordIdListCompare(RecoveredRecordData_t a, RecoveredRecordData_t b);
 
 class RecoveryManager
 {
 protected:
     // Types
-    typedef std::vector<std::string> directoryList_t;
-    typedef directoryList_t::const_iterator directoryListConstItr_t;
-    typedef std::map<uint64_t, JournalFile*> fileNumberMap_t;
+    typedef std::vector<std::string> stringList_t;
+    typedef stringList_t::const_iterator stringListConstItr_t;
+    typedef std::map<uint64_t, RecoveredFileData_t*> fileNumberMap_t;
     typedef fileNumberMap_t::iterator fileNumberMapItr_t;
     typedef fileNumberMap_t::const_iterator fileNumberMapConstItr_t;
     typedef std::vector<RecoveredRecordData_t> recordIdList_t;
@@ -74,18 +80,20 @@ protected:
 
     // Initial journal analysis data
     fileNumberMap_t fileNumberMap_;             ///< File number - JournalFilePtr map
+    stringList_t notNeededFilesList_;           ///< Files not needed and to be returned to EFP
+    stringList_t uninitFileList_;               ///< File name of uninitialized journal files found during header analysis
     bool journalEmptyFlag_;                     ///< Journal data files empty
     std::streamoff firstRecordOffset_;          ///< First record offset in ffid
     std::streamoff endOffset_;                  ///< End offset (first byte past last record)
     uint64_t highestRecordId_;                  ///< Highest rid found
     uint64_t highestFileNumber_;                ///< Highest file number found
     bool lastFileFullFlag_;                     ///< Last file is full
-    std::string uninitializedJournal_;          ///< File name of uninitialized journal found during header analysis
+    uint64_t initial_fid_;                      ///< File id where initial write after recovery will occur
 
     // State for recovery of individual enqueued records
     uint64_t currentSerial_;
     uint32_t efpFileSize_kib_;
-    fileNumberMapConstItr_t currentJournalFileConstItr_;
+    fileNumberMapConstItr_t currentJournalFileItr_;
     std::string currentFileName_;
     std::ifstream inFileStream_;
     recordIdList_t recordIdList_;
@@ -121,16 +129,18 @@ public:
 protected:
     void analyzeJournalFileHeaders(efpIdentity_t& efpIdentity);
     void checkFileStreamOk(bool checkEof);
-    void checkJournalAlignment(const std::streampos recordPosition);
+    void checkJournalAlignment(const uint64_t start_fid, const std::streampos recordPosition);
     bool decodeRecord(jrec& record,
                       std::size_t& cumulativeSizeRead,
                       ::rec_hdr_t& recordHeader,
-                      std::streampos& fileOffset);
+                      const uint64_t start_fid,
+                      const std::streampos recordOffset);
     std::string getCurrentFileName() const;
     uint64_t getCurrentFileNumber() const;
     bool getFile(const uint64_t fileNumber, bool jumpToFirstRecordOffsetFlag);
     bool getNextFile(bool jumpToFirstRecordOffsetFlag);
     bool getNextRecordHeader();
+    void lastRecord(const uint64_t file_id, const std::streamoff endOffset);
     bool needNextFile();
     void prepareRecordList();
     bool readFileHeader();

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/deq_rec.cpp Wed Feb 12 13:27:51 2014
@@ -181,7 +181,7 @@ deq_rec::encode(void* wptr, uint32_t rec
 }
 
 bool
-deq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs)
+deq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs, const std::streampos rec_start)
 {
     if (rec_offs == 0)
     {
@@ -228,7 +228,7 @@ deq_rec::decode(::rec_hdr_t& h, std::ifs
             assert(!ifsp->fail() && !ifsp->bad());
             return false;
         }
-        check_rec_tail();
+        check_rec_tail(rec_start);
     }
     ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size());
     assert(!ifsp->fail() && !ifsp->bad());
@@ -274,7 +274,7 @@ deq_rec::rec_size() const
 }
 
 void
-deq_rec::check_rec_tail() const {
+deq_rec::check_rec_tail(const std::streampos rec_start) const {
     Checksum checksum;
     checksum.addData((const unsigned char*)&_deq_hdr, sizeof(::deq_hdr_t));
     if (_deq_hdr._xidsize > 0) {
@@ -284,7 +284,7 @@ deq_rec::check_rec_tail() const {
     uint16_t res = ::rec_tail_check(&_deq_tail, &_deq_hdr._rhdr, cs);
     if (res != 0) {
         std::stringstream oss;
-        oss << std::hex;
+        oss << std::endl << "  Record offset: 0x" << std::hex << rec_start;
         if (res & ::REC_TAIL_MAGIC_ERR_MASK) {
             oss << std::endl << "  Magic: expected 0x" << ~_deq_hdr._rhdr._magic << "; found 0x" << _deq_tail._xmagic;
         }

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/deq_rec.h Wed Feb 12 13:27:51 2014
@@ -49,7 +49,7 @@ public:
     void reset(const uint64_t serial, const uint64_t rid, const  uint64_t drid, const void* const xidp,
                const std::size_t xidlen, const bool txn_coml_commit);
     uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Checksum& checksum);
-    bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs);
+    bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs, const std::streampos rec_start);
 
     inline bool is_txn_coml_commit() const { return ::is_txn_coml_commit(&_deq_hdr); }
     inline uint64_t rid() const { return _deq_hdr._rhdr._rid; }
@@ -59,7 +59,7 @@ public:
     inline std::size_t data_size() const { return 0; } // This record never carries data
     std::size_t xid_size() const;
     std::size_t rec_size() const;
-    void check_rec_tail() const;
+    void check_rec_tail(const std::streampos rec_start) const;
 
 private:
     virtual void clean();

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/enq_rec.cpp Wed Feb 12 13:27:51 2014
@@ -218,7 +218,7 @@ enq_rec::encode(void* wptr, uint32_t rec
 }
 
 bool
-enq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs)
+enq_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs, const std::streampos rec_start)
 {
     if (rec_offs == 0)
     {
@@ -291,7 +291,7 @@ enq_rec::decode(::rec_hdr_t& h, std::ifs
             assert(!ifsp->fail() && !ifsp->bad());
             return false;
         }
-        check_rec_tail();
+        check_rec_tail(rec_start);
     }
     ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size());
     assert(!ifsp->fail() && !ifsp->bad());
@@ -352,7 +352,7 @@ enq_rec::rec_size(const std::size_t xids
 }
 
 void
-enq_rec::check_rec_tail() const {
+enq_rec::check_rec_tail(const std::streampos rec_start) const {
     Checksum checksum;
     checksum.addData((const unsigned char*)&_enq_hdr, sizeof(::enq_hdr_t));
     if (_enq_hdr._xidsize > 0) {
@@ -365,7 +365,7 @@ enq_rec::check_rec_tail() const {
     uint16_t res = ::rec_tail_check(&_enq_tail, &_enq_hdr._rhdr, cs);
     if (res != 0) {
         std::stringstream oss;
-        oss << std::hex;
+        oss << std::endl << "  Record offset: 0x" << std::hex << rec_start;
         if (res & ::REC_TAIL_MAGIC_ERR_MASK) {
             oss << std::endl << "  Magic: expected 0x" << ~_enq_hdr._rhdr._magic << "; found 0x" << _enq_tail._xmagic;
         }

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/enq_rec.h Wed Feb 12 13:27:51 2014
@@ -51,7 +51,7 @@ public:
     void reset(const uint64_t serial, const uint64_t rid, const void* const dbuf, const std::size_t dlen,
                const void* const xidp, const std::size_t xidlen, const bool transient, const bool external);
     uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Checksum& checksum);
-    bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs);
+    bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs, const std::streampos rec_start);
 
     std::size_t get_xid(void** const xidpp);
     std::size_t get_data(void** const datapp);
@@ -63,7 +63,7 @@ public:
     std::size_t rec_size() const;
     static std::size_t rec_size(const std::size_t xidsize, const std::size_t dsize, const bool external);
     inline uint64_t rid() const { return _enq_hdr._rhdr._rid; }
-    void check_rec_tail() const;
+    void check_rec_tail(const std::streampos rec_start) const;
 
 private:
     virtual void clean();

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp Wed Feb 12 13:27:51 2014
@@ -21,6 +21,7 @@
 
 #include "qpid/linearstore/journal/jcntl.h"
 
+#include <iomanip>
 #include "qpid/linearstore/journal/data_tok.h"
 #include "qpid/linearstore/journal/JournalLog.h"
 
@@ -90,7 +91,7 @@ jcntl::initialize(EmptyFilePool* efpp,
     _linearFileController.finalize();
     _jdir.clear_dir(); // Clear any existing journal files
     _linearFileController.initialize(_jdir.dirname(), efpp, 0ULL);
-    _linearFileController.pullEmptyFileFromEfp();
+    _linearFileController.getNextJournalFile();
     _wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, QLS_WMGR_MAXDTOKPP, QLS_WMGR_MAXWAITUS);
     _init_flag = true;
 }
@@ -120,6 +121,9 @@ jcntl::recover(EmptyFilePoolManager* efp
     _jrnl_log.log(/*LOG_DEBUG*/JournalLog::LOG_INFO, _jid, _recoveryManager.toLog(_jid, 5));
     _linearFileController.initialize(_jdir.dirname(), _emptyFilePoolPtr, _recoveryManager.getHighestFileNumber());
     _recoveryManager.setLinearFileControllerJournals(&qpid::linearstore::journal::LinearFileController::addJournalFile, &_linearFileController);
+    if (_recoveryManager.isLastFileFull()) {
+        _linearFileController.getNextJournalFile();
+    }
     _wmgr.initialize(cbp, wcache_pgsize_sblks, wcache_num_pages, QLS_WMGR_MAXDTOKPP, QLS_WMGR_MAXWAITUS,
             (_recoveryManager.isLastFileFull() ? 0 : _recoveryManager.getEndOffset()));
 
@@ -316,6 +320,20 @@ jcntl::getLinearFileControllerRef() {
     return _linearFileController;
 }
 
+// static
+std::string
+jcntl::str2hexnum(const std::string& str) {
+    if (str.empty()) {
+        return "<null>";
+    }
+    std::ostringstream oss;
+    oss << "(" << str.size() << ")0x" << std::hex;
+    for (unsigned i=str.size(); i>0; --i) {
+        oss << std::setfill('0') << std::setw(2) << (uint16_t)(uint8_t)str[i-1];
+    }
+    return oss.str();
+}
+
 iores
 jcntl::flush(const bool block_till_aio_cmpl)
 {

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jcntl.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jcntl.h?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jcntl.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jcntl.h Wed Feb 12 13:27:51 2014
@@ -537,6 +537,8 @@ public:
     inline virtual void instr_incr_outstanding_aio_cnt() {}
     inline virtual void instr_decr_outstanding_aio_cnt() {}
 
+    static std::string str2hexnum(const std::string& str);
+
 protected:
     static bool _init;
     static bool init_statics();

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp Wed Feb 12 13:27:51 2014
@@ -167,8 +167,8 @@ jerrno::__init()
     _err_map[JERR_LFCR_SEQNUMNOTFOUND] = "JERR_LFCR_SEQNUMNOTFOUND: File sequence number not found";
 
     // class jrec, enq_rec, deq_rec, txn_rec
-    _err_map[JERR_JREC_BADRECHDR] = "JERR_JREC_BADRECHDR: Invalid data record header.";
-    _err_map[JERR_JREC_BADRECTAIL] = "JERR_JREC_BADRECTAIL: Invalid data record tail.";
+    _err_map[JERR_JREC_BADRECHDR] = "JERR_JREC_BADRECHDR: Invalid record header.";
+    _err_map[JERR_JREC_BADRECTAIL] = "JERR_JREC_BADRECTAIL: Invalid record tail.";
 
     // class wmgr
     _err_map[JERR_WMGR_BADPGSTATE] = "JERR_WMGR_BADPGSTATE: Page buffer in illegal state for operation.";

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jrec.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jrec.h?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jrec.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/jrec.h Wed Feb 12 13:27:51 2014
@@ -98,7 +98,7 @@ public:
     * \returns Number of data-blocks encoded.
     */
     virtual uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Checksum& checksum) = 0;
-    virtual bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs) = 0;
+    virtual bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs, const std::streampos rec_start) = 0;
 
     virtual std::string& str(std::string& str) const = 0;
     virtual std::size_t data_size() const = 0;

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/txn_rec.cpp Wed Feb 12 13:27:51 2014
@@ -176,7 +176,7 @@ txn_rec::encode(void* wptr, uint32_t rec
 }
 
 bool
-txn_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs)
+txn_rec::decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs, const std::streampos rec_start)
 {
     if (rec_offs == 0)
     {
@@ -218,7 +218,7 @@ txn_rec::decode(::rec_hdr_t& h, std::ifs
             assert(!ifsp->fail() && !ifsp->bad());
             return false;
         }
-        check_rec_tail();
+        check_rec_tail(rec_start);
     }
     ifsp->ignore(rec_size_dblks() * QLS_DBLK_SIZE_BYTES - rec_size());
     assert(!ifsp->fail() && !ifsp->bad());
@@ -266,7 +266,7 @@ txn_rec::rec_size() const
 }
 
 void
-txn_rec::check_rec_tail() const {
+txn_rec::check_rec_tail(const std::streampos rec_start) const {
     Checksum checksum;
     checksum.addData((const unsigned char*)&_txn_hdr, sizeof(::txn_hdr_t));
     if (_txn_hdr._xidsize > 0) {
@@ -276,7 +276,7 @@ txn_rec::check_rec_tail() const {
     uint16_t res = ::rec_tail_check(&_txn_tail, &_txn_hdr._rhdr, cs);
     if (res != 0) {
         std::stringstream oss;
-        oss << std::hex;
+        oss << std::endl << "  Record offset: 0x" << std::hex << rec_start;
         if (res & ::REC_TAIL_MAGIC_ERR_MASK) {
             oss << std::endl << "  Magic: expected 0x" << ~_txn_hdr._rhdr._magic << "; found 0x" << _txn_tail._xmagic;
         }

Modified: qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h?rev=1567616&r1=1567615&r2=1567616&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/cpp/src/qpid/linearstore/journal/txn_rec.h Wed Feb 12 13:27:51 2014
@@ -49,7 +49,7 @@ public:
     void reset(const bool commitFlag, const uint64_t serial, const uint64_t rid, const void* const xidp,
                const std::size_t xidlen);
     uint32_t encode(void* wptr, uint32_t rec_offs_dblks, uint32_t max_size_dblks, Checksum& checksum);
-    bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs);
+    bool decode(::rec_hdr_t& h, std::ifstream* ifsp, std::size_t& rec_offs, const std::streampos rec_start);
 
     std::size_t get_xid(void** const xidpp);
     std::string& str(std::string& str) const;
@@ -57,7 +57,7 @@ public:
     std::size_t xid_size() const;
     std::size_t rec_size() const;
     inline uint64_t rid() const { return _txn_hdr._rhdr._rid; }
-    void check_rec_tail() const;
+    void check_rec_tail(const std::streampos rec_start) const;
 
 private:
     virtual void clean();



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org