You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ch...@apache.org on 2013/01/07 21:17:19 UTC

svn commit: r1429990 - in /qpid/trunk/qpid/cpp/src/qpid/legacystore: JournalImpl.cpp JournalImpl.h MessageStoreImpl.cpp MessageStoreImpl.h StorePlugin.cpp

Author: chug
Date: Mon Jan  7 20:17:19 2013
New Revision: 1429990

URL: http://svn.apache.org/viewvc?rev=1429990&view=rev
Log:
QPID-1726 ASF licensed Qpid store
Update legacystore from changes in source svn revisions 4514 to 4528.


Modified:
    qpid/trunk/qpid/cpp/src/qpid/legacystore/JournalImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/legacystore/JournalImpl.h
    qpid/trunk/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.h
    qpid/trunk/qpid/cpp/src/qpid/legacystore/StorePlugin.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/legacystore/JournalImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/legacystore/JournalImpl.cpp?rev=1429990&r1=1429989&r2=1429990&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/legacystore/JournalImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/legacystore/JournalImpl.cpp Mon Jan  7 20:17:19 2013
@@ -68,7 +68,6 @@ JournalImpl::JournalImpl(qpid::sys::Time
                          _dlen(0),
                          _dtok(),
                          _external(false),
-                         _mgmtObject(),
                          deleteCallback(onDelete)
 {
     getEventsFireEventsPtr = new GetEventsFireEvent(this, getEventsTimeout);
@@ -97,8 +96,9 @@ JournalImpl::~JournalImpl()
     inactivityFireEventPtr->cancel();
     free_read_buffers();
 
-    if (_mgmtObject != 0) {
+    if (_mgmtObject.get() != 0) {
         _mgmtObject->resourceDestroy();
+	_mgmtObject.reset();
     }
 
     log(LOG_NOTICE, "Destroyed");
@@ -148,7 +148,7 @@ JournalImpl::initialize(const u_int16_t 
     jcntl::initialize(num_jfiles, auto_expand, ae_max_jfiles, jfsize_sblks, wcache_num_pages, wcache_pgsize_sblks, cbp);
     log(LOG_DEBUG, "Initialization complete");
 
-    if (_mgmtObject != 0)
+    if (_mgmtObject.get() != 0)
     {
         _mgmtObject->set_initialFileCount(_lpmgr.num_jfiles());
         _mgmtObject->set_autoExpand(_lpmgr.is_ae());
@@ -182,7 +182,7 @@ JournalImpl::recover(const u_int16_t num
     oss1 << " wcache_num_pages=" << wcache_num_pages;
     log(LOG_DEBUG, oss1.str());
 
-    if (_mgmtObject != 0)
+    if (_mgmtObject.get() != 0)
     {
         _mgmtObject->set_initialFileCount(_lpmgr.num_jfiles());
         _mgmtObject->set_autoExpand(_lpmgr.is_ae());
@@ -227,7 +227,7 @@ JournalImpl::recover(const u_int16_t num
     oss2 << "; journal now read-only.";
     log(LOG_DEBUG, oss2.str());
 
-    if (_mgmtObject != 0)
+    if (_mgmtObject.get() != 0)
     {
         _mgmtObject->inc_recordDepth(_emap.size());
         _mgmtObject->inc_enqueues(_emap.size());
@@ -348,7 +348,7 @@ JournalImpl::enqueue_data_record(const v
 {
     handleIoResult(jcntl::enqueue_data_record(data_buff, tot_data_len, this_data_len, dtokp, transient));
 
-    if (_mgmtObject != 0)
+    if (_mgmtObject.get() != 0)
     {
         _mgmtObject->inc_enqueues();
         _mgmtObject->inc_recordDepth();
@@ -361,7 +361,7 @@ JournalImpl::enqueue_extern_data_record(
 {
     handleIoResult(jcntl::enqueue_extern_data_record(tot_data_len, dtokp, transient));
 
-    if (_mgmtObject != 0)
+    if (_mgmtObject.get() != 0)
     {
         _mgmtObject->inc_enqueues();
         _mgmtObject->inc_recordDepth();
@@ -372,11 +372,11 @@ void
 JournalImpl::enqueue_txn_data_record(const void* const data_buff, const size_t tot_data_len,
         const size_t this_data_len, data_tok* dtokp, const std::string& xid, const bool transient)
 {
-    bool txn_incr = _mgmtObject != 0 ? _tmap.in_map(xid) : false;
+    bool txn_incr = _mgmtObject.get() != 0 ? _tmap.in_map(xid) : false;
 
     handleIoResult(jcntl::enqueue_txn_data_record(data_buff, tot_data_len, this_data_len, dtokp, xid, transient));
 
-    if (_mgmtObject != 0)
+    if (_mgmtObject.get() != 0)
     {
         if (!txn_incr) // If this xid was not in _tmap, it will be now...
             _mgmtObject->inc_txn();
@@ -390,11 +390,11 @@ void
 JournalImpl::enqueue_extern_txn_data_record(const size_t tot_data_len, data_tok* dtokp,
         const std::string& xid, const bool transient)
 {
-    bool txn_incr = _mgmtObject != 0 ? _tmap.in_map(xid) : false;
+    bool txn_incr = _mgmtObject.get() != 0 ? _tmap.in_map(xid) : false;
 
     handleIoResult(jcntl::enqueue_extern_txn_data_record(tot_data_len, dtokp, xid, transient));
 
-    if (_mgmtObject != 0)
+    if (_mgmtObject.get() != 0)
     {
         if (!txn_incr) // If this xid was not in _tmap, it will be now...
             _mgmtObject->inc_txn();
@@ -409,7 +409,7 @@ JournalImpl::dequeue_data_record(data_to
 {
     handleIoResult(jcntl::dequeue_data_record(dtokp, txn_coml_commit));
 
-    if (_mgmtObject != 0)
+    if (_mgmtObject.get() != 0)
     {
         _mgmtObject->inc_dequeues();
         _mgmtObject->inc_txnDequeues();
@@ -420,11 +420,11 @@ JournalImpl::dequeue_data_record(data_to
 void
 JournalImpl::dequeue_txn_data_record(data_tok* const dtokp, const std::string& xid, const bool txn_coml_commit)
 {
-    bool txn_incr = _mgmtObject != 0 ? _tmap.in_map(xid) : false;
+    bool txn_incr = _mgmtObject.get() != 0 ? _tmap.in_map(xid) : false;
 
     handleIoResult(jcntl::dequeue_txn_data_record(dtokp, xid, txn_coml_commit));
 
-    if (_mgmtObject != 0)
+    if (_mgmtObject.get() != 0)
     {
         if (!txn_incr) // If this xid was not in _tmap, it will be now...
             _mgmtObject->inc_txn();
@@ -439,7 +439,7 @@ JournalImpl::txn_abort(data_tok* const d
 {
     handleIoResult(jcntl::txn_abort(dtokp, xid));
 
-    if (_mgmtObject != 0)
+    if (_mgmtObject.get() != 0)
     {
         _mgmtObject->dec_txn();
         _mgmtObject->inc_txnAborts();
@@ -451,7 +451,7 @@ JournalImpl::txn_commit(data_tok* const 
 {
     handleIoResult(jcntl::txn_commit(dtokp, xid));
 
-    if (_mgmtObject != 0)
+    if (_mgmtObject.get() != 0)
     {
         _mgmtObject->dec_txn();
         _mgmtObject->inc_txnCommits();
@@ -466,8 +466,9 @@ JournalImpl::stop(bool block_till_aio_cm
     ifep->cancel();
     jcntl::stop(block_till_aio_cmpl);
 
-    if (_mgmtObject != 0) {
+    if (_mgmtObject.get() != 0) {
         _mgmtObject->resourceDestroy();
+        _mgmtObject.reset();
     }
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/legacystore/JournalImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/legacystore/JournalImpl.h?rev=1429990&r1=1429989&r2=1429990&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/legacystore/JournalImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/legacystore/JournalImpl.h Mon Jan  7 20:17:19 2013
@@ -166,7 +166,7 @@ class JournalImpl : public qpid::broker:
     bool loadMsgContent(u_int64_t rid, std::string& data, size_t length, size_t offset = 0);
 
     // Overrides for write inactivity timer
-    void enqueue_data_record(const void* const data_buffGetManagementObject, const size_t tot_data_len,
+    void enqueue_data_record(const void* const data_buff, const size_t tot_data_len,
                              const size_t this_data_len, mrg::journal::data_tok* dtokp,
                              const bool transient = false);
 
@@ -227,10 +227,10 @@ class JournalImpl : public qpid::broker:
 
     // Management instrumentation callbacks overridden from jcntl
     inline void instr_incr_outstanding_aio_cnt() {
-        if (_mgmtObject != 0) _mgmtObject->inc_outstandingAIOs();
+      if (_mgmtObject.get() != 0) _mgmtObject->inc_outstandingAIOs();
     }
     inline void instr_decr_outstanding_aio_cnt() {
-        if (_mgmtObject != 0) _mgmtObject->dec_outstandingAIOs();
+      if (_mgmtObject.get() != 0) _mgmtObject->dec_outstandingAIOs();
     }
 
 }; // class JournalImpl

Modified: qpid/trunk/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp?rev=1429990&r1=1429989&r2=1429990&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.cpp Mon Jan  7 20:17:19 2013
@@ -57,7 +57,7 @@ MessageStoreImpl::TplRecoverStruct::TplR
                                                      tpc_flag(_tpc_flag)
 {}
 
-MessageStoreImpl::MessageStoreImpl(qpid::sys::Timer& timer_, const char* envpath) :
+MessageStoreImpl::MessageStoreImpl(qpid::broker::Broker* broker_, const char* envpath) :
                                    numJrnlFiles(0),
                                    autoJrnlExpand(false),
                                    autoJrnlExpandMaxFiles(0),
@@ -72,7 +72,7 @@ MessageStoreImpl::MessageStoreImpl(qpid:
                                    highestRid(0),
                                    isInit(false),
                                    envPath(envpath),
-                                   timer(timer_),
+                                   broker(broker_),
                                    mgmtObject(),
                                    agent(0)
 {}
@@ -218,7 +218,7 @@ void MessageStoreImpl::chkJrnlAutoExpand
     autoJrnlExpandMaxFiles = p;
 }
 
-void MessageStoreImpl::initManagement (qpid::broker::Broker* broker)
+void MessageStoreImpl::initManagement ()
 {
     if (broker != 0) {
         agent = broker->getManagementAgent();
@@ -364,7 +364,7 @@ void MessageStoreImpl::init()
             // NOTE: during normal initialization, agent == 0 because the store is initialized before the management infrastructure.
             // However during a truncated initialization in a cluster, agent != 0. We always pass 0 as the agent for the
             // TplStore to keep things consistent in a cluster. See https://bugzilla.redhat.com/show_bug.cgi?id=681026
-            tplStorePtr.reset(new TplJournalImpl(timer, "TplStore", getTplBaseDir(), "tpl", defJournalGetEventsTimeout, defJournalFlushTimeout, 0));
+            tplStorePtr.reset(new TplJournalImpl(broker->getTimer(), "TplStore", getTplBaseDir(), "tpl", defJournalGetEventsTimeout, defJournalFlushTimeout, 0));
             isInit = true;
         } catch (const DbException& e) {
             if (e.get_errno() == DB_VERSION_MISMATCH)
@@ -402,8 +402,9 @@ void MessageStoreImpl::finalize()
         }
     }
 
-    if (mgmtObject != 0) {
+    if (mgmtObject.get() != 0) {
         mgmtObject->resourceDestroy();
+	mgmtObject.reset();
     }
 }
 
@@ -443,7 +444,7 @@ void MessageStoreImpl::chkTplStoreInit()
     if (!tplStorePtr->is_ready()) {
         journal::jdir::create_dir(getTplBaseDir());
         tplStorePtr->initialize(tplNumJrnlFiles, false, 0, tplJrnlFsizeSblks, tplWCacheNumPages, tplWCachePgSizeSblks);
-        if (mgmtObject != 0) mgmtObject->set_tplIsInitialized(true);
+        if (mgmtObject.get() != 0) mgmtObject->set_tplIsInitialized(true);
     }
 }
 
@@ -479,8 +480,9 @@ MessageStoreImpl::~MessageStoreImpl()
         QPID_LOG(error, "Unknown error in MessageStoreImpl::~MessageStoreImpl()");
     }
 
-    if (mgmtObject != 0) {
+    if (mgmtObject.get() != 0) {
         mgmtObject->resourceDestroy();
+	mgmtObject.reset();
     }
 }
 
@@ -513,7 +515,7 @@ void MessageStoreImpl::create(qpid::brok
         return;
     }
 
-    jQueue = new JournalImpl(timer, queue.getName(), getJrnlDir(queue),  std::string("JournalData"),
+    jQueue = new JournalImpl(broker->getTimer(), queue.getName(), getJrnlDir(queue),  std::string("JournalData"),
                              defJournalGetEventsTimeout, defJournalFlushTimeout, agent,
                              boost::bind(&MessageStoreImpl::journalDeleted, this, _1));
     {
@@ -705,7 +707,7 @@ void MessageStoreImpl::recover(qpid::bro
     //recover transactions:
     for (txn_list::iterator i = prepared.begin(); i != prepared.end(); i++) {
         const PreparedTransaction pt = *i;
-        if (mgmtObject != 0) {
+        if (mgmtObject.get() != 0) {
             mgmtObject->inc_tplTransactionDepth();
             mgmtObject->inc_tplTxnPrepares();
         }
@@ -799,7 +801,7 @@ void MessageStoreImpl::recoverQueues(Txn
             QPID_LOG(error, "Cannot recover empty (null) queue name - ignoring and attempting to continue.");
             break;
         }
-        jQueue = new JournalImpl(timer, queueName, getJrnlHashDir(queueName), std::string("JournalData"),
+        jQueue = new JournalImpl(broker->getTimer(), queueName, getJrnlHashDir(queueName), std::string("JournalData"),
                                  defJournalGetEventsTimeout, defJournalFlushTimeout, agent,
                                  boost::bind(&MessageStoreImpl::journalDeleted, this, _1));
         {
@@ -979,6 +981,8 @@ void MessageStoreImpl::recoverMessages(T
                 // At some future point if delivery attempts are stored, then this call would
                 // become optional depending on that information.
                 msg->setRedelivered();
+		// Reset the TTL for the recovered message
+		msg->computeExpiration(broker->getExpiryPolicy());
 
                 u_int32_t contentOffset = headerSize + preambleLength;
                 u_int64_t contentSize = readSize - contentOffset;
@@ -1436,7 +1440,7 @@ void MessageStoreImpl::completed(TxnCtxt
             tplStorePtr->dequeue_txn_data_record(txn.getDtok(), txn.getXid(), commit);
         }
         txn.complete(commit);
-        if (mgmtObject != 0) {
+        if (mgmtObject.get() != 0) {
             mgmtObject->dec_tplTransactionDepth();
             if (commit)
                 mgmtObject->inc_tplTxnCommits();
@@ -1490,7 +1494,7 @@ void MessageStoreImpl::localPrepare(TxnC
         ctxt->prepare(tplStorePtr.get());
         // make sure all the data is written to disk before returning
         ctxt->sync();
-        if (mgmtObject != 0) {
+        if (mgmtObject.get() != 0) {
             mgmtObject->inc_tplTransactionDepth();
             mgmtObject->inc_tplTxnPrepares();
         }

Modified: qpid/trunk/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.h?rev=1429990&r1=1429989&r2=1429990&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/legacystore/MessageStoreImpl.h Mon Jan  7 20:17:19 2013
@@ -150,7 +150,7 @@ class MessageStoreImpl : public qpid::br
     u_int64_t highestRid;
     bool isInit;
     const char* envPath;
-    qpid::sys::Timer& timer;
+    qpid::broker::Broker* broker;
 
     qmf::org::apache::qpid::legacystore::Store::shared_ptr mgmtObject;
     qpid::management::ManagementAgent* agent;
@@ -273,7 +273,7 @@ class MessageStoreImpl : public qpid::br
   public:
     typedef boost::shared_ptr<MessageStoreImpl> shared_ptr;
 
-    MessageStoreImpl(qpid::sys::Timer& timer, const char* envpath = 0);
+    MessageStoreImpl(qpid::broker::Broker* broker, const char* envpath = 0);
 
     virtual ~MessageStoreImpl();
 
@@ -292,7 +292,7 @@ class MessageStoreImpl : public qpid::br
 
     void truncateInit(const bool saveStoreContent = false);
 
-    void initManagement (qpid::broker::Broker* broker);
+    void initManagement ();
 
     void finalize();
 
@@ -364,9 +364,7 @@ class MessageStoreImpl : public qpid::br
     qpid::management::ManagementObject::shared_ptr GetManagementObject (void) const
         { return mgmtObject; }
 
-    using qpid::management::Manageable::ManagementMethod;
-
-    inline qpid::management::Manageable::status_t ManagementMethod (u_int32_t, qpid::management::Args&)
+    inline qpid::management::Manageable::status_t ManagementMethod (u_int32_t, qpid::management::Args&, std::string&)
         { return qpid::management::Manageable::STATUS_OK; }
 
     std::string getStoreDir() const;

Modified: qpid/trunk/qpid/cpp/src/qpid/legacystore/StorePlugin.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/legacystore/StorePlugin.cpp?rev=1429990&r1=1429989&r2=1429990&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/legacystore/StorePlugin.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/legacystore/StorePlugin.cpp Mon Jan  7 20:17:19 2013
@@ -44,7 +44,7 @@ struct StorePlugin : public Plugin {
     {
         Broker* broker = dynamic_cast<Broker*>(&target);
         if (!broker) return;
-        store.reset(new MessageStoreImpl(broker->getTimer()));
+        store.reset(new MessageStoreImpl(broker));
         DataDir& dataDir = broker->getDataDir ();
         if (options.storeDir.empty ())
         {
@@ -65,7 +65,7 @@ struct StorePlugin : public Plugin {
         if (!broker) return;
         if (!store) return;
         QPID_LOG(info, "Enabling management instrumentation for the store.");
-        store->initManagement(broker);
+        store->initManagement();
     }
 
     void finalize()



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org