You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2014/11/04 15:58:47 UTC

svn commit: r1636598 - in /qpid/trunk/qpid/cpp/src/qpid/linearstore: ./ journal/

Author: kpvdr
Date: Tue Nov  4 14:58:47 2014
New Revision: 1636598

URL: http://svn.apache.org/r1636598
Log:
QPID-5671: [linearstore] Add ability to use disk partitions and select per-queue EFPs. This is the first part of resolving this issue and changes the journal directory structure to use symlinks instead of moving the actual files. This opens the way to having files from multiple partitions in the same journal. Other small improvements and tidy-ups also included.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/linearstore/ISSUES
    qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolTypes.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h
    qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp
    qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/jerrno.h

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/ISSUES
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/ISSUES?rev=1636598&r1=1636597&r2=1636598&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/ISSUES (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/ISSUES Tue Nov  4 14:58:47 2014
@@ -25,31 +25,24 @@ Current/pending:
  ------ -------  ----------------------
    5359 -        Linearstore: Implement new management schema and wire into store
    5360 -        Linearstore: Evaluate and rework logging to produce a consistent log output
-   5361 -        Linearstore: No tests for linearstore functionality currently exist
+   5361 1145359  Linearstore: No tests for linearstore functionality currently exist
                    svn r.1564893 2014-02-05: Added tx-test-soak.sh
                    svn r.1564935 2014-02-05: Added license text to tx-test-soak.sh
+                   svn r.1625283 2014-09-16: Basic python tests from legacystore ported over to linearstore
                    * No existing tests for linearstore:
                    ** Basic broker-level tests for txn and non-txn recovery
                    ** Store-level tests which check write boundary conditions
                    ** EFP tests, including file recovery, error management
                    ** Unit tests
                    ** Basic performance tests
-   5362 -        Linearstore: No store tools exist for examining the journals
-                   svn r.1556888 2014-01-09: WIP checkin for linearstore version of qpid_qls_analyze. Needs testing and tidy-up.
-                   svn r.1560530 2014-01-22: Bugfixes for qpid_qls_analyze
-                   svn r.1561848 2014-01-27: Bugfixes and enhancements for qpid_qls_analyze
-                   svn r.1564808 2014-02-05: Bugfixes and enhancements for qpid_qls_analyze
-                   svn r.1578899 2014-03-18: Bugfixes and enhancements for qpid_qls_analyze
-                   svn r.1583778 2014-04-01: Bugfix for qpid_qls_analyze
-                   * Store analysis and status
-                   * Recovery/reading of message content
-                   * Empty file pool status and management
    5464 -        [linearstore] Incompletely created journal files accumulate in EFP
-   -    1088944  [Linearstore] store does not return all files to EFP after purging big queue
-   -    1078937  [linearstore] Installation and tests for new store analysis tool qpid-qls-analyze
-                   svn r.1596633 2014-05-21: Modified to run from installed location
-*  6043 1066256  [LinearStore] changing efp size after using store broke the new durable nodes creation
-*  -    1089652  [RFE]: Configuration option for linear store to delete the used journal files instead of recycling them.
+   -    1088944  [Linearstore] store does not return all files to EFP after purging big queue <queue purge issue>
+   6043 1066256  [LinearStore] changing efp size after using store broke the new durable nodes creation
+   -    1067480  [LinearStore] Provide a way to limit max count/size of empty files in EFP
+   -    1067429  [LinearStore] last file from deleted queue is not moved to EFP  <queue delete issue>
+   -    1067482  [LinearStore] Provide a way to prealocate empty pages in EFP
+   5671          [linearstore] Add ability to use disk partitions and select per-queue EFPs
+
 
 
 Fixed/closed:
@@ -118,12 +111,24 @@ NO-JIRA -        Added missing Apache co
    5651       -  [C++ broker] segfault in qpid::linearstore::journal::jdir::clear_dir when declaring durable queue
                    svn r.1582730 2014-03-28 Proposed fix by Pavel Moravec
                    * Bug introduced by r.1578899.
+   5362 1145363  Linearstore: No store tools exist for examining the journals
+                   svn r.1556888 2014-01-09: WIP checkin for linearstore version of qpid_qls_analyze. Needs testing and tidy-up.
+                   svn r.1560530 2014-01-22: Bugfixes for qpid_qls_analyze
+                   svn r.1561848 2014-01-27: Bugfixes and enhancements for qpid_qls_analyze
+                   svn r.1564808 2014-02-05: Bugfixes and enhancements for qpid_qls_analyze
+                   svn r.1578899 2014-03-18: Bugfixes and enhancements for qpid_qls_analyze
+                   svn r.1583778 2014-04-01: Bugfix for qpid_qls_analyze
+                   * Store analysis and status
+                   * Recovery/reading of message content
+                   * Empty file pool status and management
    5661       -  [linearstore] Set default cmake build to exclude linearstore
                    svn r.1584379 2014-04-03 Proposed solution.
                    * Run ccmake, select BUILD_LINEARSTORE to change its value to ON to build.
    5750 1078142  [linearstore] qpidd closes connection with (distributed) transactional client while checking previous transaction, broker signals error (closed by error: Queue Ve0-2: async_dequeue() failed: exception 0x0103 wmgr::get_events() threw JERR__AIO: AIO error)
                    svn r.1594215 2014-05-13 Proposed solution.
                    * jexception 0x0103 wmgr::get_events() threw JERR__AIO: AIO error. (AIO write operation failed: Invalid argument (-22) [pg=0 size=8192 offset=4096 fh=22])
+   5655 1078937  [linearstore] Installation and tests for new store analysis tool qpid-qls-analyze
+                   svn r.1596633 2014-05-21: Modified to run from installed location
    5767 1098118  [linearstore] broker segfaults when recovering journal file with damaged header
                    svn r.1596509 2014-05-21 Proposed solution (committed by pmoravec)
                    svn r.1599243 2014-06-02 Solution to additional case of file header corruption
@@ -134,6 +139,10 @@ NO-JIRA -        Added missing Apache co
                    This turned out to be an AMQP error, fix does not affect store code.
    6043 1089652  [RFE]: Configuration option for linear store to delete or overwrite the used journal files.
                    svn r.1620426 2014-08-25 Proposed solution
+   6147 1152012  [C++ broker linearstore] missing journal id in "trace Mgmt create journal." log
+                   svn r.1631360 2014-10-13 Proposed solution
+   6157 1150397  linearstore: segfault when 2 journals request new journal file from empty EFP
+                   svn r.1632504 2014-10-17 Proposed solution by pmoravec
                    
 
 Ordered checkin list:
@@ -175,6 +184,8 @@ no.   svn r  Q-JIRA     RHBZ       Date 
 30. 1599243    5767  1098118 2014-06-02
 31. 1614665    5924  1124906 2014-07-30
 32. 1620426    6043  1089652 2014-08-25
+33. 1631360    6147  1152012 2014-10-13 (pmoravec)
+34. 1632504    6157  1150397 2014-10-17 (pmoravec)
 
 See above sections for details on these checkins.
 

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp?rev=1636598&r1=1636597&r2=1636598&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.cpp Tue Nov  4 14:58:47 2014
@@ -201,8 +201,7 @@ bool MessageStoreImpl::init(const std::s
 
     if (truncateFlag_)
         truncateInit();
-    else
-        init();
+    init(truncateFlag_);
 
     QLS_LOG(info, "Store module initialized; store-dir=" << storeDir_);
     QLS_LOG(info,   "> Default EFP partition: " << defaultEfpPartitionNumber);
@@ -218,7 +217,7 @@ bool MessageStoreImpl::init(const std::s
     return isInit;
 }
 
-void MessageStoreImpl::init()
+void MessageStoreImpl::init(const bool truncateFlag)
 {
     const int retryMax = 3;
     int bdbRetryCnt = 0;
@@ -296,6 +295,7 @@ void MessageStoreImpl::init()
                                                           defaultEfpPartitionNumber,
                                                           defaultEfpFileSize_kib,
                                                           overwriteBeforeReturnFlag,
+                                                          truncateFlag,
                                                           jrnlLog));
     efpMgr->findEfpPartitions();
 }
@@ -337,13 +337,12 @@ void MessageStoreImpl::truncateInit()
         isInit = false;
     }
 
-    // TODO: Linearstore: harvest all discareded journal files into the empy file pool(s).
-
     qpid::linearstore::journal::jdir::delete_dir(getBdbBaseDir());
+
+    // TODO: Linearstore: harvest all discarded journal files into the empty file pool(s).
     qpid::linearstore::journal::jdir::delete_dir(getJrnlBaseDir());
     qpid::linearstore::journal::jdir::delete_dir(getTplBaseDir());
     QLS_LOG(info, "Store directory " << getStoreTopLevelDir() << " was truncated.");
-    init();
 }
 
 void MessageStoreImpl::chkTplStoreInit()

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h?rev=1636598&r1=1636597&r2=1636598&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/MessageStoreImpl.h Tue Nov  4 14:58:47 2014
@@ -157,7 +157,7 @@ class MessageStoreImpl : public qpid::br
     static qpid::linearstore::journal::efpDataSize_kib_t chkEfpFileSizeKiB(const qpid::linearstore::journal::efpDataSize_kib_t efpFileSizeKiB,
                                                               const std::string& paramName);
 
-    void init();
+    void init(const bool truncateFlag);
 
     void recoverQueues(TxnCtxt& txn,
                        qpid::broker::RecoveryManager& recovery,

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp?rev=1636598&r1=1636597&r2=1636598&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.cpp Tue Nov  4 14:58:47 2014
@@ -33,37 +33,53 @@
 #include <unistd.h>
 #include <vector>
 
-//#include <iostream> // DEBUG
-
 namespace qpid {
 namespace linearstore {
 namespace journal {
 
+// static
+std::string EmptyFilePool::s_inuseFileDirectory_ = "in_use";
+
+// static
+std::string EmptyFilePool::s_returnedFileDirectory_ = "returned";
+
 EmptyFilePool::EmptyFilePool(const std::string& efpDirectory,
                              const EmptyFilePoolPartition* partitionPtr,
+                             const bool overwriteBeforeReturnFlag,
+                             const bool truncateFlag,
                              JournalLog& journalLogRef) :
                 efpDirectory_(efpDirectory),
                 efpDataSize_kib_(dataSizeFromDirName_kib(efpDirectory, partitionPtr->getPartitionNumber())),
                 partitionPtr_(partitionPtr),
+                overwriteBeforeReturnFlag_(overwriteBeforeReturnFlag),
+                truncateFlag_(truncateFlag),
                 journalLogRef_(journalLogRef)
 {}
 
 EmptyFilePool::~EmptyFilePool() {}
 
 void EmptyFilePool::initialize() {
+//std::cout << "*** Initializing EFP " << efpDataSize_kib_ << "k in partition " << partitionPtr_->getPartitionNumber() << "; efpDirectory=" << efpDirectory_ << std::endl; // DEBUG
     std::vector<std::string> dirList;
+
+    // Process empty files in main dir
     jdir::read_dir(efpDirectory_, dirList, false, true, false, false);
     for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) {
         size_t dotPos = i->rfind(".");
         if (dotPos != std::string::npos) {
             if (i->substr(dotPos).compare(".jrnl") == 0 && i->length() == 41) {
-                std::string emptyFile(efpDirectory_ + "/" + (*i));
-                if (validateEmptyFile(emptyFile)) {
-                    pushEmptyFile(emptyFile);
+                std::string emptyFileName(efpDirectory_ + "/" + (*i));
+                if (validateEmptyFile(emptyFileName)) {
+                    pushEmptyFile(emptyFileName);
                 }
             }
         }
     }
+
+    // Create 'in_use' and 'returned' subdirs if they don't already exist
+    // Retern files to EFP in 'in_use' and 'returned' subdirs if they do exist
+    initializeSubDirectory(efpDirectory_ + "/" + s_inuseFileDirectory_);
+    initializeSubDirectory(efpDirectory_ + "/" + s_returnedFileDirectory_);
 }
 
 efpDataSize_kib_t EmptyFilePool::dataSize_kib() const {
@@ -106,36 +122,29 @@ const efpIdentity_t EmptyFilePool::getId
 
 std::string EmptyFilePool::takeEmptyFile(const std::string& destDirectory) {
     std::string emptyFileName = popEmptyFile();
-    std::string newFileName = destDirectory + emptyFileName.substr(emptyFileName.rfind('/')); // NOTE: substr() includes leading '/'
-    if (moveEmptyFile(emptyFileName.c_str(), newFileName.c_str())) {
+    std::string newFileName = efpDirectory_ + "/" + s_inuseFileDirectory_ + emptyFileName.substr(emptyFileName.rfind('/'));
+    std::string symlinkName = destDirectory + emptyFileName.substr(emptyFileName.rfind('/')); // NOTE: substr() includes leading '/'
+    if (moveFile(emptyFileName, newFileName)) {
         // Try again with new UUID for file name
-        newFileName = destDirectory + "/" + getEfpFileName();
-        if (moveEmptyFile(emptyFileName.c_str(), newFileName.c_str())) {
+        newFileName = efpDirectory_ + "/" + s_inuseFileDirectory_ + "/" + getEfpFileName();
+        if (moveFile(emptyFileName, newFileName)) {
+//std::cerr << "*** DEBUG: pushEmptyFile " << emptyFileName << "from  EmptyFilePool::takeEmptyFile()" << std::endl; // DEBUG
             pushEmptyFile(emptyFileName);
             std::ostringstream oss;
             oss << "file=\"" << emptyFileName << "\" dest=\"" <<  newFileName << "\"" << FORMAT_SYSERR(errno);
             throw jexception(jerrno::JERR_JDIR_FMOVE, oss.str(), "EmptyFilePool", "takeEmptyFile");
         }
     }
-    return newFileName;
+    if (createSymLink(newFileName, symlinkName)) {
+        std::ostringstream oss;
+        oss << "file=\"" << emptyFileName << "\" dest=\"" <<  newFileName << "\" symlink=\"" << symlinkName << "\"" << FORMAT_SYSERR(errno);
+        throw jexception(jerrno::JERR_EFP_SYMLINK, oss.str(), "EmptyFilePool", "takeEmptyFile");
+    }
+    return symlinkName;
 }
 
-void EmptyFilePool::returnEmptyFile(const std::string& fqSrcFile) {
-    std::string emptyFileName(efpDirectory_ + fqSrcFile.substr(fqSrcFile.rfind('/'))); // NOTE: substr() includes leading '/'
-    if (moveEmptyFile(fqSrcFile.c_str(), emptyFileName.c_str())) {
-        // Try again with new UUID for file name
-        emptyFileName = efpDirectory_ + "/" + getEfpFileName();
-        if (moveEmptyFile(fqSrcFile.c_str(), emptyFileName.c_str())) {
-            // Failed twice in a row - delete file
-            ::unlink(fqSrcFile.c_str());
-            return;
-        }
-    }
-    resetEmptyFileHeader(emptyFileName);
-    if (partitionPtr_->getOverwriteBeforeReturnFlag()) {
-        overwriteFileContents(emptyFileName);
-    }
-    pushEmptyFile(emptyFileName);
+void EmptyFilePool::returnEmptyFileSymlink(const std::string& emptyFileSymlink) {
+    returnEmptyFile(deleteSymlink(emptyFileSymlink));
 }
 
 //static
@@ -173,12 +182,12 @@ efpDataSize_kib_t EmptyFilePool::dataSiz
 
 // --- protected functions ---
 
-// WARNING: this method needs to be called under the scope of emptyFileListMutex_ lock
-void EmptyFilePool::createEmptyFile() {
+std::string EmptyFilePool::createEmptyFile() {
     std::string efpfn = getEfpFileName();
-    if (overwriteFileContents(efpfn)) {
-        emptyFileList_.push_back(efpfn);
+    if (!overwriteFileContents(efpfn)) {
+        // TODO: handle failure to prepare new file here
     }
+    return efpfn;
 }
 
 std::string EmptyFilePool::getEfpFileName() {
@@ -188,6 +197,27 @@ std::string EmptyFilePool::getEfpFileNam
     return oss.str();
 }
 
+void EmptyFilePool::initializeSubDirectory(const std::string& fqDirName) {
+    std::vector<std::string> dirList;
+    if (jdir::exists(fqDirName)) {
+        if (truncateFlag_) {
+            jdir::read_dir(fqDirName, dirList, false, true, false, false);
+            for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) {
+                size_t dotPos = i->rfind(".");
+                if (i->substr(dotPos).compare(".jrnl") == 0 && i->length() == 41) {
+                    returnEmptyFile(fqDirName + "/" + (*i));
+                } else {
+                    std::ostringstream oss;
+                    oss << "File \'" << *i << "\' was not a journal file and was not returned to EFP.";
+                    journalLogRef_.log(JournalLog::LOG_WARN, oss.str());
+                }
+            }
+        }
+    } else {
+        jdir::create_dir(fqDirName);
+    }
+}
+
 bool EmptyFilePool::overwriteFileContents(const std::string& fqFileName) {
     ::file_hdr_t fh;
     ::file_hdr_create(&fh, QLS_FILE_MAGIC, QLS_JRNL_VERSION, QLS_JRNL_FHDR_RES_SIZE_SBLKS, partitionPtr_->getPartitionNumber(), efpDataSize_kib_);
@@ -199,22 +229,27 @@ bool EmptyFilePool::overwriteFileContent
             ofs.put('\0');
         ofs.close();
         return true;
-//std::cout << "WARNING: EFP " << efpDirectory << " is empty - created new journal file " << efpfn.substr(efpfn.rfind('/') + 1) << " on the fly" << std::endl; // DEBUG
+//std::cout << "*** WARNING: EFP " << efpDirectory_ << " is empty - created new journal file " << fqFileName.substr(fqFileName.rfind('/') + 1) << " on the fly" << std::endl; // DEBUG
     } else {
-//std::cerr << "ERROR: Unable to open file \"" << efpfn << "\"" << std::endl; // DEBUG
+//std::cerr << "*** ERROR: Unable to open file \"" << fqFileName << "\"" << std::endl; // DEBUG
     }
     return false;
 }
 
 std::string EmptyFilePool::popEmptyFile() {
     std::string emptyFileName;
+    bool listEmptyFlag;
     {
         slock l(emptyFileListMutex_);
-        if (emptyFileList_.empty()) {
-            createEmptyFile();
+        listEmptyFlag = emptyFileList_.empty();
+        if (!listEmptyFlag) {
+            emptyFileName = emptyFileList_.front();
+            emptyFileList_.pop_front();
         }
-        emptyFileName = emptyFileList_.front();
-        emptyFileList_.pop_front();
+    }
+    // If the list is empty, create a new file and return the file name.
+    if (listEmptyFlag) {
+        emptyFileName = createEmptyFile();
     }
     return emptyFileName;
 }
@@ -224,6 +259,28 @@ void EmptyFilePool::pushEmptyFile(const 
     emptyFileList_.push_back(fqFileName);
 }
 
+void EmptyFilePool::returnEmptyFile(const std::string& emptyFileName) {
+    std::string returnedFileName = efpDirectory_ + "/" + s_returnedFileDirectory_ + emptyFileName.substr(emptyFileName.rfind('/')); // NOTE: substr() includes leading '/'
+    if (moveFile(emptyFileName, returnedFileName)) {
+        ::unlink(emptyFileName.c_str());
+//std::cerr << "*** WARNING: Unable to move file " << emptyFileName << " to " << returnedFileName << "; deleted." << std::endl; // DEBUG
+    }
+
+    // TODO: On a separate thread, process returned files by overwriting headers and, optionally, their contents and
+    // returning them to the EFP directory
+    resetEmptyFileHeader(returnedFileName);
+    if (overwriteBeforeReturnFlag_) {
+        overwriteFileContents(returnedFileName);
+    }
+    std::string sanitizedEmptyFileName = efpDirectory_ + returnedFileName.substr(returnedFileName.rfind('/')); // NOTE: substr() includes leading '/'
+    if (moveFile(returnedFileName, sanitizedEmptyFileName)) {
+        ::unlink(returnedFileName.c_str());
+//std::cerr << "*** WARNING: Unable to move file " << returnedFileName << " to " << sanitizedEmptyFileName << "; deleted." << std::endl; // DEBUG
+    } else {
+        pushEmptyFile(sanitizedEmptyFileName);
+    }
+}
+
 void EmptyFilePool::resetEmptyFileHeader(const std::string& fqFileName) {
     std::fstream fs(fqFileName.c_str(), std::fstream::in | std::fstream::out | std::fstream::binary);
     if (fs.good()) {
@@ -239,14 +296,14 @@ void EmptyFilePool::resetEmptyFileHeader
             fs.write(buff, buffsize);
             std::streampos bytesWritten = fs.tellp();
             if (std::streamoff(bytesWritten) != buffsize) {
-//std::cerr << "ERROR: Unable to write file header of file \"" << fqFileName_ << "\": tried to write " << buffsize << " bytes; wrote " << bytesWritten << " bytes." << std::endl;
+//std::cerr << "*** ERROR: Unable to write file header of file \"" << fqFileName << "\": tried to write " << buffsize << " bytes; wrote " << bytesWritten << " bytes." << std::endl; // DEBUG
             }
         } else {
-//std::cerr << "ERROR: Unable to read file header of file \"" << fqFileName_ << "\": tried to read " << sizeof(::file_hdr_t) << " bytes; read " << bytesRead << " bytes." << std::endl;
+//std::cerr << "*** ERROR: Unable to read file header of file \"" << fqFileName << "\": tried to read " << sizeof(::file_hdr_t) << " bytes; read " << bytesRead << " bytes." << std::endl; // DEBUG
         }
         fs.close();
     } else {
-//std::cerr << "ERROR: Unable to open file \"" << fqFileName_ << "\" for reading" << std::endl; // DEBUG
+//std::cerr << "*** ERROR: Unable to open file \"" << fqFileName << "\" for reading" << std::endl; // DEBUG
     }
 }
 
@@ -329,7 +386,7 @@ bool EmptyFilePool::validateEmptyFile(co
 }
 
 // static
-int EmptyFilePool::moveEmptyFile(const std::string& from,
+int EmptyFilePool::moveFile(const std::string& from,
                                  const std::string& to) {
     if (::rename(from.c_str(), to.c_str())) {
         if (errno == EEXIST) return errno; // File name exists
@@ -340,4 +397,29 @@ int EmptyFilePool::moveEmptyFile(const s
     return 0;
 }
 
+//static
+int EmptyFilePool::createSymLink(const std::string& fqFileName,
+                                 const std::string& fqLinkName) {
+    if(::symlink(fqFileName.c_str(), fqLinkName.c_str())) {
+        if (errno == EEXIST) return errno; // File name exists
+        std::ostringstream oss;
+        oss << "file=\"" << fqFileName << "\" symlink=\"" <<  fqLinkName << "\"" << FORMAT_SYSERR(errno);
+        throw jexception(jerrno::JERR_EFP_SYMLINK, oss.str(), "EmptyFilePool", "createSymLink");
+    }
+    return 0;
+}
+
+//static
+std::string EmptyFilePool::deleteSymlink(const std::string& fqLinkName) {
+    char buff[1024];
+    ssize_t len = ::readlink(fqLinkName.c_str(), buff, 1024);
+    if (len < 0) {
+        std::ostringstream oss;
+        oss << "symlink=\"" << fqLinkName << "\"" << FORMAT_SYSERR(errno);
+        throw jexception(jerrno::JERR_EFP_SYMLINK, oss.str(), "EmptyFilePool", "deleteSymlink");
+    }
+    ::unlink(fqLinkName.c_str());
+    return std::string(buff, len);
+}
+
 }}}

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h?rev=1636598&r1=1636597&r2=1636598&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePool.h Tue Nov  4 14:58:47 2014
@@ -44,11 +44,16 @@ class EmptyFilePool
 {
 protected:
     typedef std::deque<std::string> emptyFileList_t;
-    typedef emptyFileList_t::iterator emptyFileListItr_t;
+    typedef emptyFileList_t::const_iterator emptyFileListConstItr_t;
+
+    static std::string s_inuseFileDirectory_;
+    static std::string s_returnedFileDirectory_;
 
     const std::string efpDirectory_;
     const efpDataSize_kib_t efpDataSize_kib_;
     const EmptyFilePoolPartition* partitionPtr_;
+    const bool overwriteBeforeReturnFlag_;
+    const bool truncateFlag_;
     JournalLog& journalLogRef_;
 
 private:
@@ -58,6 +63,8 @@ private:
 public:
     EmptyFilePool(const std::string& efpDirectory,
                   const EmptyFilePoolPartition* partitionPtr,
+                  const bool overwriteBeforeReturnFlag,
+                  const bool truncateFlag,
                   JournalLog& journalLogRef);
     virtual ~EmptyFilePool();
 
@@ -73,23 +80,28 @@ public:
     const efpIdentity_t getIdentity() const;
 
     std::string takeEmptyFile(const std::string& destDirectory);
-    void returnEmptyFile(const std::string& srcFile);
+    void returnEmptyFileSymlink(const std::string& emptyFileSymlink);
 
     static std::string dirNameFromDataSize(const efpDataSize_kib_t efpDataSize_kib);
     static efpDataSize_kib_t dataSizeFromDirName_kib(const std::string& dirName,
                                                      const efpPartitionNumber_t partitionNumber);
 
 protected:
-    void createEmptyFile();
+    std::string createEmptyFile();
     std::string getEfpFileName();
-    std::string popEmptyFile();
+    void initializeSubDirectory(const std::string& fqDirName);
     bool overwriteFileContents(const std::string& fqFileName);
+    std::string popEmptyFile();
     void pushEmptyFile(const std::string fqFileName);
+    void returnEmptyFile(const std::string& emptyFileName);
     void resetEmptyFileHeader(const std::string& fqFileName);
     bool validateEmptyFile(const std::string& emptyFileName) const;
 
-    static int moveEmptyFile(const std::string& fromFqPath,
-                             const std::string& toFqPath);
+    static int moveFile(const std::string& fromFqPath,
+                        const std::string& toFqPath);
+    static int createSymLink(const std::string& fqFileName,
+                             const std::string& fqLinkName);
+    static std::string deleteSymlink(const std::string& fqLinkName);
 };
 
 }}}

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp?rev=1636598&r1=1636597&r2=1636598&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.cpp Tue Nov  4 14:58:47 2014
@@ -27,8 +27,6 @@
 #include "qpid/linearstore/journal/JournalLog.h"
 #include "qpid/linearstore/journal/slock.h"
 
-//#include <iostream> // DEBUG
-
 namespace qpid {
 namespace linearstore {
 namespace journal {
@@ -37,11 +35,13 @@ EmptyFilePoolManager::EmptyFilePoolManag
                                            const efpPartitionNumber_t defaultPartitionNumber,
                                            const efpDataSize_kib_t defaultEfpDataSize_kib,
                                            const bool overwriteBeforeReturnFlag,
+                                           const bool truncateFlag,
                                            JournalLog& journalLogRef) :
                 qlsStorePath_(qlsStorePath),
                 defaultPartitionNumber_(defaultPartitionNumber),
                 defaultEfpDataSize_kib_(defaultEfpDataSize_kib),
                 overwriteBeforeReturnFlag_(overwriteBeforeReturnFlag),
+                truncateFlag_(truncateFlag),
                 journalLogRef_(journalLogRef)
 {}
 
@@ -63,20 +63,7 @@ void EmptyFilePoolManager::findEfpPartit
             efpPartitionNumber_t pn = EmptyFilePoolPartition::getPartitionNumber(*i);
             if (pn > 0) { // valid partition name found
                 std::string fullDirPath(qlsStorePath_ + "/" + (*i));
-                EmptyFilePoolPartition* efppp = 0;
-                try {
-                    efppp = new EmptyFilePoolPartition(pn, fullDirPath, overwriteBeforeReturnFlag_, journalLogRef_);
-                    {
-                        slock l(partitionMapMutex_);
-                        partitionMap_[pn] = efppp;
-                    }
-                } catch (const std::exception& e) {
-                    if (efppp != 0) {
-                        delete efppp;
-                        efppp = 0;
-                    }
-//std::cerr << "Unable to initialize partition " << pn << " (\'" << fullDirPath << "\'): " << e.what() << std::endl; // DEBUG
-                }
+                EmptyFilePoolPartition* efppp = insertPartition(pn, fullDirPath);
                 if (efppp != 0)
                     efppp->findEmptyFilePools();
                 foundPartition = true;
@@ -85,37 +72,28 @@ void EmptyFilePoolManager::findEfpPartit
 
         // If no partition was found, create an empty default partition.
         if (!foundPartition) {
-            journalLogRef_.log(JournalLog::LOG_INFO, "No EFP partition found, creating an empty partition.");
-            std::ostringstream oss;
-            oss << qlsStorePath_ << "/" << EmptyFilePoolPartition::getPartionDirectoryName(defaultPartitionNumber_)
-                << "/" << EmptyFilePoolPartition::s_efpTopLevelDir_ << "/" << EmptyFilePool::dirNameFromDataSize(defaultEfpDataSize_kib_);
-            jdir::create_dir(oss.str());
+            std::ostringstream oss1;
+            oss1 << qlsStorePath_ << "/" << EmptyFilePoolPartition::getPartionDirectoryName(defaultPartitionNumber_)
+                << "/" << EmptyFilePool::dirNameFromDataSize(defaultEfpDataSize_kib_);
+            jdir::create_dir(oss1.str());
+            insertPartition(defaultPartitionNumber_, oss1.str());
+            std::ostringstream oss2;
+            oss2 << "No EFP partition found, creating an empty partition at " << oss1.str();
+            journalLogRef_.log(JournalLog::LOG_INFO, oss2.str());
         }
     }
 
     journalLogRef_.log(JournalLog::LOG_INFO, "EFP Manager initialization complete");
     std::vector<qpid::linearstore::journal::EmptyFilePoolPartition*> partitionList;
-    std::vector<qpid::linearstore::journal::EmptyFilePool*> filePoolList;
     getEfpPartitions(partitionList);
     if (partitionList.size() == 0) {
         journalLogRef_.log(JournalLog::LOG_WARN, "NO EFP PARTITIONS FOUND! No queue creation is possible.");
     } else {
         std::stringstream oss;
-        oss << "> EFP Partitions found: " << partitionList.size();
+        oss << "EFP Partitions found: " << partitionList.size();
         journalLogRef_.log(JournalLog::LOG_INFO, oss.str());
         for (std::vector<qpid::linearstore::journal::EmptyFilePoolPartition*>::const_iterator i=partitionList.begin(); i!= partitionList.end(); ++i) {
-            filePoolList.clear();
-            (*i)->getEmptyFilePools(filePoolList);
-            std::stringstream oss;
-            oss << "  * Partition " << (*i)->getPartitionNumber() << " containing " << filePoolList.size()
-                << " pool" << (filePoolList.size()>1 ? "s" : "") << " at \'" << (*i)->getPartitionDirectory() << "\'";
-            journalLogRef_.log(JournalLog::LOG_INFO, oss.str());
-            for (std::vector<qpid::linearstore::journal::EmptyFilePool*>::const_iterator j=filePoolList.begin(); j!=filePoolList.end(); ++j) {
-                std::ostringstream oss;
-                oss << "    - EFP \'" << (*j)->dataSize_kib() << "k\' containing " << (*j)->numEmptyFiles() <<
-                              " files of size " << (*j)->dataSize_kib() << " KiB totaling " << (*j)->cumFileSize_kib() << " KiB";
-            journalLogRef_.log(JournalLog::LOG_INFO, oss.str());
-            }
+            journalLogRef_.log(JournalLog::LOG_INFO, (*i)->toString(5U));
         }
     }
 }
@@ -210,4 +188,22 @@ uint16_t EmptyFilePoolManager::getNumEfp
     return partitionMap_.size();
 }
 
+EmptyFilePoolPartition* EmptyFilePoolManager::insertPartition(const efpPartitionNumber_t pn, const std::string& fullPartitionPath) {
+    EmptyFilePoolPartition* efppp = 0;
+    try {
+        efppp = new EmptyFilePoolPartition(pn, fullPartitionPath, overwriteBeforeReturnFlag_, truncateFlag_, journalLogRef_);
+        {
+            slock l(partitionMapMutex_);
+            partitionMap_[pn] = efppp;
+        }
+    } catch (const std::exception& e) {
+        if (efppp != 0) {
+            delete efppp;
+            efppp = 0;
+        }
+//std::cerr << "*** Unable to initialize partition " << pn << " (\'" << fullPartitionPath << "\'): " << e.what() << std::endl; // DEBUG
+    }
+    return efppp;
+}
+
 }}}

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.h?rev=1636598&r1=1636597&r2=1636598&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolManager.h Tue Nov  4 14:58:47 2014
@@ -46,6 +46,7 @@ protected:
     const efpPartitionNumber_t defaultPartitionNumber_;
     const efpDataSize_kib_t defaultEfpDataSize_kib_;
     const bool overwriteBeforeReturnFlag_;
+    const bool truncateFlag_;
     JournalLog& journalLogRef_;
     partitionMap_t partitionMap_;
     smutex partitionMapMutex_;
@@ -55,6 +56,7 @@ public:
                          const efpPartitionNumber_t defaultPartitionNumber,
                          const efpDataSize_kib_t defaultEfpDataSize_kib,
                          const bool overwriteBeforeReturnFlag,
+                         const bool truncateFlag,
                          JournalLog& journalLogRef_);
     virtual ~EmptyFilePoolManager();
 
@@ -72,6 +74,8 @@ public:
     void getEmptyFilePools(std::vector<EmptyFilePool*>& emptyFilePoolList,
                            const efpPartitionNumber_t efpPartitionNumber = 0);
     uint16_t getNumEfpPartitions() const;
+protected:
+    EmptyFilePoolPartition* insertPartition(const efpPartitionNumber_t pn, const std::string& fullPartitionPath);
 };
 
 }}}

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp?rev=1636598&r1=1636597&r2=1636598&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.cpp Tue Nov  4 14:58:47 2014
@@ -26,22 +26,19 @@
 #include "qpid/linearstore/journal/jdir.h"
 #include "qpid/linearstore/journal/slock.h"
 
-//#include <iostream> // DEBUG
-
 namespace qpid {
 namespace linearstore {
 namespace journal {
 
-// static
-const std::string EmptyFilePoolPartition::s_efpTopLevelDir_("efp"); // Sets the top-level efp dir within a partition
-
 EmptyFilePoolPartition::EmptyFilePoolPartition(const efpPartitionNumber_t partitionNum,
                                                const std::string& partitionDir,
                                                const bool overwriteBeforeReturnFlag,
+                                               const bool truncateFlag,
                                                JournalLog& journalLogRef) :
                 partitionNum_(partitionNum),
                 partitionDir_(partitionDir),
                 overwriteBeforeReturnFlag_(overwriteBeforeReturnFlag),
+                truncateFlag_(truncateFlag),
                 journalLogRef_(journalLogRef)
 {
     validatePartitionDir();
@@ -57,25 +54,13 @@ EmptyFilePoolPartition::~EmptyFilePoolPa
 
 void
 EmptyFilePoolPartition::findEmptyFilePools() {
-//std::cout << "Reading " << partitionDir << std::endl; // DEBUG
+//std::cout << "*** Reading " << partitionDir_ << std::endl; // DEBUG
     std::vector<std::string> dirList;
-    jdir::read_dir(partitionDir_, dirList, true, false, false, false);
-    bool foundEfpDir = false;
-    for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) {
-        if (i->compare(s_efpTopLevelDir_) == 0) {
-            foundEfpDir = true;
-            break;
-        }
-    }
-    if (foundEfpDir) {
-        std::string efpDir(partitionDir_ + "/" + s_efpTopLevelDir_);
-//std::cout << "Reading " << efpDir << std::endl; // DEBUG
-        dirList.clear();
-        jdir::read_dir(efpDir, dirList, true, false, false, true);
+        jdir::read_dir(partitionDir_, dirList, true, false, false, true);
         for (std::vector<std::string>::iterator i = dirList.begin(); i != dirList.end(); ++i) {
             EmptyFilePool* efpp = 0;
             try {
-                efpp = new EmptyFilePool(*i, this, journalLogRef_);
+                efpp = new EmptyFilePool(*i, this, overwriteBeforeReturnFlag_, truncateFlag_, journalLogRef_);
                 {
                     slock l(efpMapMutex_);
                     efpMap_[efpp->dataSize_kib()] = efpp;
@@ -88,13 +73,14 @@ EmptyFilePoolPartition::findEmptyFilePoo
                 }
                 //std::cerr << "WARNING: " << e.what() << std::endl;
             }
-            if (efpp != 0)
+            if (efpp != 0) {
                 efpp->initialize();
+            }
         }
-    }
 }
 
 EmptyFilePool* EmptyFilePoolPartition::getEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib) {
+    slock l(efpMapMutex_);
     efpMapItr_t i = efpMap_.find(efpDataSize_kib);
     if (i == efpMap_.end())
         return 0;
@@ -102,21 +88,19 @@ EmptyFilePool* EmptyFilePoolPartition::g
 }
 
 void EmptyFilePoolPartition::getEmptyFilePools(std::vector<EmptyFilePool*>& efpList) {
+    slock l(efpMapMutex_);
     for (efpMapItr_t i=efpMap_.begin(); i!=efpMap_.end(); ++i) {
         efpList.push_back(i->second);
     }
 }
 
 void EmptyFilePoolPartition::getEmptyFilePoolSizes_kib(std::vector<efpDataSize_kib_t>& efpDataSizesList_kib) const {
+    slock l(efpMapMutex_);
     for (efpMapConstItr_t i=efpMap_.begin(); i!=efpMap_.end(); ++i) {
         efpDataSizesList_kib.push_back(i->first);
     }
 }
 
-bool EmptyFilePoolPartition::getOverwriteBeforeReturnFlag() const {
-    return overwriteBeforeReturnFlag_;
-}
-
 std::string EmptyFilePoolPartition::getPartitionDirectory() const {
     return partitionDir_;
 }
@@ -125,6 +109,32 @@ efpPartitionNumber_t EmptyFilePoolPartit
     return partitionNum_;
 }
 
+std::string EmptyFilePoolPartition::toString(const uint16_t indent) const {
+    std::string indentStr(indent, ' ');
+    std::stringstream oss;
+    oss << "EFP Partition " <<  partitionNum_ << ":" << std::endl;
+    oss << indentStr  << "EFP Partition Analysis (partition " << partitionNum_ << " at \"" << partitionDir_ << "\"):" << std::endl;
+    if (efpMap_.empty()) {
+        oss << indentStr << "<Partition empty, no EFPs found>" << std::endl;
+    } else {
+        oss << indentStr << std::setw(12) << "efp_size_kib"
+                         << std::setw(12) << "num_files"
+                         << std::setw(18) << "tot_capacity_kib" << std::endl;
+        oss << indentStr << std::setw(12) << "------------"
+                         << std::setw(12) << "----------"
+                         << std::setw(18) << "----------------" << std::endl;
+        {
+            slock l(efpMapMutex_);
+            for (efpMapConstItr_t i=efpMap_.begin(); i!= efpMap_.end(); ++i) {
+                oss << indentStr << std::setw(12) << i->first
+                                 << std::setw(12) << i->second->numEmptyFiles()
+                                 << std::setw(18) << i->second->cumFileSize_kib() << std::endl;
+            }
+        }
+    }
+    return oss.str();
+}
+
 // static
 std::string EmptyFilePoolPartition::getPartionDirectoryName(const efpPartitionNumber_t partitionNumber) {
     std::ostringstream oss;

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h?rev=1636598&r1=1636597&r2=1636598&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolPartition.h Tue Nov  4 14:58:47 2014
@@ -37,8 +37,6 @@ class JournalLog;
 
 class EmptyFilePoolPartition
 {
-public:
-    static const std::string s_efpTopLevelDir_;
 protected:
     typedef std::map<efpDataSize_kib_t, EmptyFilePool*> efpMap_t;
     typedef efpMap_t::iterator efpMapItr_t;
@@ -47,6 +45,7 @@ protected:
     const efpPartitionNumber_t partitionNum_;
     const std::string partitionDir_;
     const bool overwriteBeforeReturnFlag_;
+    const bool truncateFlag_;
     JournalLog& journalLogRef_;
     efpMap_t efpMap_;
     smutex efpMapMutex_;
@@ -55,6 +54,7 @@ public:
     EmptyFilePoolPartition(const efpPartitionNumber_t partitionNum,
                            const std::string& partitionDir,
                            const bool overwriteBeforeReturnFlag,
+                           const bool truncateFlag,
                            JournalLog& journalLogRef);
     virtual ~EmptyFilePoolPartition();
 
@@ -62,9 +62,9 @@ public:
     EmptyFilePool* getEmptyFilePool(const efpDataSize_kib_t efpDataSize_kib);
     void getEmptyFilePools(std::vector<EmptyFilePool*>& efpList);
     void getEmptyFilePoolSizes_kib(std::vector<efpDataSize_kib_t>& efpDataSizesList) const;
-    bool getOverwriteBeforeReturnFlag() const;
     std::string getPartitionDirectory() const;
     efpPartitionNumber_t getPartitionNumber() const;
+    std::string toString(const uint16_t indent) const;
 
     static std::string getPartionDirectoryName(const efpPartitionNumber_t partitionNumber);
     static efpPartitionNumber_t getPartitionNumber(const std::string& name);

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolTypes.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolTypes.h?rev=1636598&r1=1636597&r2=1636598&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolTypes.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/EmptyFilePoolTypes.h Tue Nov  4 14:58:47 2014
@@ -23,6 +23,7 @@
 #define QPID_LINEARSTORE_JOURNAL_EMPTYFILEPOOLTYPES_H_
 
 #include <iostream>
+#include <sstream>
 #include <stdint.h>
 
 namespace qpid {
@@ -42,7 +43,13 @@ typedef struct efpIdentity_t {
     efpIdentity_t() : pn_(0), ds_(0) {}
     efpIdentity_t(efpPartitionNumber_t pn, efpDataSize_kib_t ds) : pn_(pn), ds_(ds) {}
     efpIdentity_t(const efpIdentity_t& ei) : pn_(ei.pn_), ds_(ei.ds_) {}
-    friend std::ostream& operator<<(std::ostream& os, efpIdentity_t& id) { os << "[" << id.pn_ << "," << id.ds_ << "]"; return os; }
+    friend std::ostream& operator<<(std::ostream& os, const efpIdentity_t& id) {
+        // This two-stage write allows this << operator to be used with std::setw() for formatted writes
+        std::ostringstream oss;
+        oss << id.pn_ << "," << id.ds_;
+        os << oss.str();
+        return os;
+    }
 } efpIdentity_t;
 
 }}}

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp?rev=1636598&r1=1636597&r2=1636598&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/LinearFileController.cpp Tue Nov  4 14:58:47 2014
@@ -96,7 +96,7 @@ uint64_t LinearFileController::getNextRe
 
 void LinearFileController::removeFileToEfp(const std::string& fileName) {
     if (emptyFilePoolPtr_) {
-        emptyFilePoolPtr_->returnEmptyFile(fileName);
+        emptyFilePoolPtr_->returnEmptyFileSymlink(fileName);
     }
 }
 
@@ -108,7 +108,7 @@ void LinearFileController::restoreEmptyF
 void LinearFileController::purgeEmptyFilesToEfp() {
     slock l(journalFileListMutex_);
     while (journalFileList_.front()->isNoEnqueuedRecordsRemaining() && journalFileList_.size() > 1) { // Can't purge last file, even if it has no enqueued records
-        emptyFilePoolPtr_->returnEmptyFile(journalFileList_.front()->getFqFileName());
+        emptyFilePoolPtr_->returnEmptyFileSymlink(journalFileList_.front()->getFqFileName());
         delete journalFileList_.front();
         journalFileList_.pop_front();
     }

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp?rev=1636598&r1=1636597&r2=1636598&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.cpp Tue Nov  4 14:58:47 2014
@@ -322,36 +322,7 @@ void RecoveryManager::setLinearFileContr
     }
 }
 
-std::string RecoveryManager::toString(const std::string& jid) {
-    std::ostringstream oss;
-    oss << "Recovery journal analysis (jid=\"" << jid << "\"):" << std::endl;
-    oss << "  Number of journal files = " << fileNumberMap_.size() << std::endl;
-    oss << "  Journal File List:" << std::endl;
-    for (fileNumberMapConstItr_t k=fileNumberMap_.begin(); k!=fileNumberMap_.end(); ++k) {
-        std::string fqFileName = k->second->journalFilePtr_->getFqFileName();
-        oss << "    " << k->first << ": " << fqFileName.substr(fqFileName.rfind('/')+1) << std::endl;
-    }
-    oss << "  Enqueue Counts: [ ";
-    for (fileNumberMapConstItr_t l=fileNumberMap_.begin(); l!=fileNumberMap_.end(); ++l) {
-        if (l != fileNumberMap_.begin()) {
-            oss << ", ";
-        }
-        oss << l->second->journalFilePtr_->getEnqueuedRecordCount();
-    }
-    oss << " ]" << std::endl;
-    oss << "  Journal empty = " << (journalEmptyFlag_ ? "TRUE" : "FALSE") << std::endl;
-    oss << "  First record offset in first file = 0x" << std::hex << firstRecordOffset_ <<
-            std::dec << " (" << (firstRecordOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
-    oss << "  End offset = 0x" << std::hex << endOffset_ << std::dec << " ("  <<
-            (endOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
-    oss << "  Highest rid = 0x" << std::hex << highestRecordId_ << std::dec << std::endl;
-    oss << "  Highest file number = 0x" << std::hex << highestFileNumber_ << std::dec << std::endl;
-    oss << "  Last file full = " << (lastFileFullFlag_ ? "TRUE" : "FALSE") << std::endl;
-    oss << "  Enqueued records (txn & non-txn):" << std::endl;
-    return oss.str();
-}
-
-std::string RecoveryManager::toLog(const std::string& jid, const int indent) {
+std::string RecoveryManager::toString(const std::string& jid, const uint16_t indent) const {
     std::string indentStr(indent, ' ');
     std::ostringstream oss;
     oss << std::endl << indentStr  << "Journal recovery analysis (jid=\"" << jid << "\"):" << std::endl;
@@ -360,18 +331,17 @@ std::string RecoveryManager::toLog(const
     } else {
         oss << indentStr << std::setw(7) << "file_id"
                          << std::setw(43) << "file_name"
-                         << std::setw(16) << "fro"
                          << std::setw(12) << "record_cnt"
-                         << std::setw(5) << "ptn"
-                         << std::setw(10) << "efp"
+                         << std::setw(16) << "fro"
+                         << std::setw(12) << "efp_id"
                          << std::endl;
         oss << indentStr << std::setw(7) << "-------"
                          << std::setw(43) << "-----------------------------------------"
+                         << std::setw(12) << "----------"
                          << std::setw(16) << "--------------"
                          << std::setw(12) << "----------"
-                         << std::setw(5) << "---"
-                         << std::setw(10) << "--------"
                          << std::endl;
+        uint32_t totalRecordCount(0UL);
         for (fileNumberMapConstItr_t k=fileNumberMap_.begin(); k!=fileNumberMap_.end(); ++k) {
             std::string fqFileName = k->second->journalFilePtr_->getFqFileName();
             std::ostringstream fid;
@@ -380,19 +350,20 @@ std::string RecoveryManager::toLog(const
             fro << std::hex << "0x" << k->second->journalFilePtr_->getFirstRecordOffset();
             oss << indentStr << std::setw(7) << fid.str()
                              << std::setw(43) << fqFileName.substr(fqFileName.rfind('/')+1)
-                             << std::setw(16) << fro.str()
                              << std::setw(12) << k->second->journalFilePtr_->getEnqueuedRecordCount()
-                             << std::setw(5) << k->second->journalFilePtr_->getEfpIdentity().pn_
-                             << std::setw(9) << k->second->journalFilePtr_->getEfpIdentity().ds_ << "k"
+                             << std::setw(16) << fro.str()
+                             << std::setw(12) << k->second->journalFilePtr_->getEfpIdentity()
                              << std::endl;
+            totalRecordCount += k->second->journalFilePtr_->getEnqueuedRecordCount();
         }
+        oss << indentStr << std::setw(62) << "----------" << std::endl;
+        oss << indentStr << std::setw(62) << totalRecordCount << std::endl;
         oss << indentStr << "First record offset in first file = 0x" << std::hex << firstRecordOffset_ <<
                 std::dec << " (" << (firstRecordOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
         oss << indentStr << "End offset in last file = 0x" << std::hex << endOffset_ << std::dec << " ("  <<
                 (endOffset_/QLS_DBLK_SIZE_BYTES) << " dblks)" << std::endl;
         oss << indentStr << "Highest rid found = 0x" << std::hex << highestRecordId_ << std::dec << std::endl;
         oss << indentStr << "Last file full = " << (lastFileFullFlag_ ? "TRUE" : "FALSE") << std::endl;
-        //oss << indentStr << "Enqueued records (txn & non-txn):"; // TODO: complete report
     }
     return oss.str();
 }
@@ -942,7 +913,7 @@ bool RecoveryManager::readJournalFileHea
 void RecoveryManager::removeEmptyFiles(EmptyFilePool* emptyFilePoolPtr) {
     while (fileNumberMap_.begin()->second->journalFilePtr_->getEnqueuedRecordCount() == 0 && fileNumberMap_.size() > 1) {
         RecoveredFileData_t* rfdp = fileNumberMap_.begin()->second;
-        emptyFilePoolPtr->returnEmptyFile(rfdp->journalFilePtr_->getFqFileName());
+        emptyFilePoolPtr->returnEmptyFileSymlink(rfdp->journalFilePtr_->getFqFileName());
         delete rfdp->journalFilePtr_;
         delete rfdp;
         fileNumberMap_.erase(fileNumberMap_.begin()->first);

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h?rev=1636598&r1=1636597&r2=1636598&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/RecoveryManager.h Tue Nov  4 14:58:47 2014
@@ -125,8 +125,7 @@ public:
     void recoveryComplete();
     void setLinearFileControllerJournals(lfcAddJournalFileFn fnPtr,
                                          LinearFileController* lfcPtr);
-    std::string toString(const std::string& jid);
-    std::string toLog(const std::string& jid, const int indent);
+    std::string toString(const std::string& jid, const uint16_t indent) const;
 protected:
     void analyzeJournalFileHeaders(efpIdentity_t& efpIdentity);
     void checkFileStreamOk(bool checkEof);

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp?rev=1636598&r1=1636597&r2=1636598&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/jcntl.cpp Tue Nov  4 14:58:47 2014
@@ -119,7 +119,7 @@ jcntl::recover(EmptyFilePoolManager* efp
     assert(_emptyFilePoolPtr != 0);
 
     highest_rid = _recoveryManager.getHighestRecordId();
-    _jrnl_log.log(/*LOG_DEBUG*/JournalLog::LOG_INFO, _jid, _recoveryManager.toLog(_jid, 5));
+    _jrnl_log.log(/*LOG_DEBUG*/JournalLog::LOG_INFO, _jid, _recoveryManager.toString(_jid, 5U));
     _linearFileController.initialize(_jdir.dirname(), _emptyFilePoolPtr, _recoveryManager.getHighestFileNumber());
     _recoveryManager.setLinearFileControllerJournals(&qpid::linearstore::journal::LinearFileController::addJournalFile, &_linearFileController);
     if (_recoveryManager.isLastFileFull()) {

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp?rev=1636598&r1=1636597&r2=1636598&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/jerrno.cpp Tue Nov  4 14:58:47 2014
@@ -112,6 +112,7 @@ const uint32_t jerrno::JERR_EFP_BADPARTI
 const uint32_t jerrno::JERR_EFP_BADEFPDIRNAME    = 0x0d03;
 const uint32_t jerrno::JERR_EFP_NOEFP            = 0x0d04;
 const uint32_t jerrno::JERR_EFP_EMPTY            = 0x0d05;
+const uint32_t jerrno::JERR_EFP_SYMLINK          = 0x0d06;
 
 // Negative returns for some functions
 const int32_t jerrno::AIO_TIMEOUT                = -1;
@@ -206,6 +207,7 @@ jerrno::__init()
     _err_map[JERR_EFP_BADPARTITIONDIR] = "JERR_EFP_BADPARTITIONDIR: Invalid partition directory";
     _err_map[JERR_EFP_NOEFP] = "JERR_EFP_NOEFP: No Empty File Pool found for given partition and empty file size";
     _err_map[JERR_EFP_EMPTY] = "JERR_EFP_EMPTY: Empty File Pool is empty";
+    _err_map[JERR_EFP_SYMLINK] = "JERR_EFP_SYMLINK: Symbolic link operation failed";
 
     //_err_map[] = "";
 

Modified: qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/jerrno.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/jerrno.h?rev=1636598&r1=1636597&r2=1636598&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/jerrno.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/linearstore/journal/jerrno.h Tue Nov  4 14:58:47 2014
@@ -130,6 +130,7 @@ namespace journal {
         static const uint32_t JERR_EFP_BADPARTITIONDIR; ///< Invalid partition directory
         static const uint32_t JERR_EFP_NOEFP;           ///< No EFP found for given partition and file size
         static const uint32_t JERR_EFP_EMPTY;           ///< EFP empty
+        static const uint32_t JERR_EFP_SYMLINK;         ///< Symbolic Link operation failed
 
         // Negative returns for some functions
         static const int32_t AIO_TIMEOUT;               ///< Timeout waiting for AIO return



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org