You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2014/01/29 15:48:55 UTC

svn commit: r1562466 [6/6] - in /qpid/branches/java-broker-bdb-ha: ./ qpid/ qpid/cpp/src/ qpid/cpp/src/qpid/client/ qpid/cpp/src/qpid/framing/ qpid/cpp/src/qpid/ha/ qpid/cpp/src/qpid/messaging/amqp/ qpid/cpp/src/tests/ qpid/java/ qpid/java/amqp-1-0-cli...

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java?rev=1562466&r1=1562465&r2=1562466&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java Wed Jan 29 14:48:50 2014
@@ -26,7 +26,7 @@ import java.util.UUID;
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.message.EnqueueableMessage;
 import org.apache.qpid.server.message.ServerMessage;
 import org.apache.qpid.server.model.VirtualHost;
 
@@ -265,7 +265,7 @@ public class SlowMessageStore implements
             _underlying = underlying;
         }
 
-        public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message)
+        public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message)
                 throws AMQStoreException
         {
             doPreDelay("enqueueMessage");
@@ -273,7 +273,7 @@ public class SlowMessageStore implements
             doPostDelay("enqueueMessage");
         }
 
-        public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message)
+        public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message)
                 throws AMQStoreException
         {
             doPreDelay("dequeueMessage");

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/PortRestTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/PortRestTest.java?rev=1562466&r1=1562465&r2=1562466&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/PortRestTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/PortRestTest.java Wed Jan 29 14:48:50 2014
@@ -29,6 +29,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.qpid.server.BrokerOptions;
 import org.apache.qpid.server.model.AuthenticationProvider;
 import org.apache.qpid.server.model.Port;
 import org.apache.qpid.server.model.Protocol;
@@ -288,4 +289,38 @@ public class PortRestTest extends QpidRe
         Map<String, Object> port = getRestTestHelper().getJsonAsSingletonList("/rest/port/" + portName);
         assertEquals("Unexpected auth provider", ANONYMOUS_AUTHENTICATION_PROVIDER, port.get(Port.AUTHENTICATION_PROVIDER));
     }
+
+    public void testDefaultAmqpPortIsQuiescedWhenInManagementMode() throws Exception
+    {
+        // restart Broker in management port
+        stopBroker();
+        startBroker(0, true);
+        getRestTestHelper().setUsernameAndPassword(BrokerOptions.MANAGEMENT_MODE_USER_NAME, MANAGEMENT_MODE_PASSWORD);
+
+        String ampqPortName = TestBrokerConfiguration.ENTRY_NAME_AMQP_PORT;
+        Map<String, Object> portData = getRestTestHelper().getJsonAsSingletonList("/rest/port/" + URLDecoder.decode(ampqPortName, "UTF-8"));
+        Asserts.assertPortAttributes(portData, State.QUIESCED);
+    }
+
+    public void testNewPortQuiescedIfPortNumberWasUsed() throws Exception
+    {
+        String ampqPortName = TestBrokerConfiguration.ENTRY_NAME_AMQP_PORT;
+        Map<String, Object> portData = getRestTestHelper().getJsonAsSingletonList("/rest/port/" + URLDecoder.decode(ampqPortName, "UTF-8"));
+        int amqpPort = (Integer)portData.get(Port.PORT);
+
+        int deleteResponseCode = getRestTestHelper().submitRequest("/rest/port/" + ampqPortName, "DELETE", null);
+        assertEquals("Port deletion should be allowed", 200, deleteResponseCode);
+
+        String newPortName = "reused-port";
+        Map<String, Object> attributes = new HashMap<String, Object>();
+        attributes.put(Port.NAME, newPortName);
+        attributes.put(Port.PORT, amqpPort);  // reuses port that was previously in use
+        attributes.put(Port.AUTHENTICATION_PROVIDER, TestBrokerConfiguration.ENTRY_NAME_AUTHENTICATION_PROVIDER);
+
+        int responseCode = getRestTestHelper().submitRequest("/rest/port/" + newPortName, "PUT", attributes);
+        assertEquals("Unexpected response code for port creation", 201, responseCode);
+
+        portData = getRestTestHelper().getJsonAsSingletonList("/rest/port/" + URLDecoder.decode(newPortName, "UTF-8"));
+        Asserts.assertPortAttributes(portData, State.QUIESCED);
+    }
 }

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java?rev=1562466&r1=1562465&r2=1562466&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java Wed Jan 29 14:48:50 2014
@@ -281,8 +281,7 @@ public class BrokerACLTest extends QpidR
         assertPortExists(portName);
     }
 
-    // TODO:  test disabled until allowing the deletion of active ports outside management mode
-    public void DISABLED_testDeletePortAllowed() throws Exception
+    public void testDeletePortAllowed() throws Exception
     {
         getRestTestHelper().setUsernameAndPassword(ALLOWED_USER, ALLOWED_USER);
 

Modified: qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java?rev=1562466&r1=1562465&r2=1562466&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java Wed Jan 29 14:48:50 2014
@@ -57,7 +57,7 @@ public class SyncWaitDelayTest extends Q
 
         final String prefix = "virtualhosts.virtualhost." + VIRTUALHOST;
         setVirtualHostConfigurationProperty(prefix + ".type", StandardVirtualHostFactory.TYPE);
-        setVirtualHostConfigurationProperty(prefix + ".store.class", "org.apache.qpid.server.store.SlowMessageStore");
+        setVirtualHostConfigurationProperty(prefix + ".store.class", org.apache.qpid.server.store.SlowMessageStore.class.getName());
         setVirtualHostConfigurationProperty(prefix + ".store.delays.commitTran.post", String.valueOf(POST_COMMIT_DELAY));
 
         super.setUp();

Propchange: qpid/branches/java-broker-bdb-ha/qpid/python/
------------------------------------------------------------------------------
  Merged /qpid/trunk/qpid/python:r1560620-1562452

Modified: qpid/branches/java-broker-bdb-ha/qpid/python/qpid-python-test
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/python/qpid-python-test?rev=1562466&r1=1562465&r2=1562466&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/python/qpid-python-test (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/python/qpid-python-test Wed Jan 29 14:48:50 2014
@@ -633,7 +633,7 @@ if not list_only:
 if xmlr:
    xmlr.end()
 
-if failed or skipped:
+if failed:
   sys.exit(1)
 else:
   sys.exit(0)

Modified: qpid/branches/java-broker-bdb-ha/qpid/python/qpid/compat.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/python/qpid/compat.py?rev=1562466&r1=1562465&r2=1562466&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/python/qpid/compat.py (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/python/qpid/compat.py Wed Jan 29 14:48:50 2014
@@ -54,7 +54,8 @@ class BaseWaiter:
   def wait(self, timeout=None):
     start = time.time()
     if timeout is not None:
-      while True:
+      ready = False
+      while timeout > 0:
         try:
           ready, _, _ = select([self], [], [], timeout)
           break

Modified: qpid/branches/java-broker-bdb-ha/qpid/tools/src/py/qls/efp.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/tools/src/py/qls/efp.py?rev=1562466&r1=1562465&r2=1562466&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/tools/src/py/qls/efp.py (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/tools/src/py/qls/efp.py Wed Jan 29 14:48:50 2014
@@ -40,7 +40,7 @@ class EfpManager(object):
             print
             for ptn in self.partitions:
                 ptn.report()
-    def run(self, args):
+    def run(self, _):
         for dir_entry in os.listdir(self.directory):
             try:
                 efpp = EfpPartition(os.path.join(self.directory, dir_entry))

Modified: qpid/branches/java-broker-bdb-ha/qpid/tools/src/py/qls/err.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/tools/src/py/qls/err.py?rev=1562466&r1=1562465&r2=1562466&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/tools/src/py/qls/err.py (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/tools/src/py/qls/err.py Wed Jan 29 14:48:50 2014
@@ -43,7 +43,7 @@ class AlreadyLockedError(QlsRecordError)
     def __str__(self):
         return 'Transactional operation already locked in TransactionMap: ' + QlsRecordError.__str__(self)
 
-class DatqaSizeError(QlsError):
+class DataSizeError(QlsError):
     """Error class for Data size mismatch"""
     def __init__(self, expected_size, actual_size, data_str):
         QlsError.__init__(self)
@@ -61,6 +61,13 @@ class DuplicateRecordIdError(QlsRecordEr
     def __str__(self):
         return 'Duplicate Record Id in enqueue map: ' + QlsRecordError.__str__(self)
 
+class EnqueueCountUnderflowError(QlsRecordError):
+    """Attempted to decrement enqueue count past 0"""
+    def __init__(self, file_header, record):
+        QlsRecordError.__init__(self, file_header, record)
+    def __str__(self):
+        return 'Enqueue record count underflow: ' + QlsRecordError.__str__(self)
+
 class ExternalDataError(QlsRecordError):
     """Data present in Enqueue record when external data flag is set"""
     def __init__(self, file_header, record):
@@ -75,6 +82,13 @@ class FirstRecordOffsetMismatchError(Qls
     def __str__(self):
         return 'First record offset mismatch: ' + QlsRecordError.__str__(self) + ' expected_offset=0x%x' % \
             self.file_header.first_record_offset
+            
+class InvalidClassError(QlsError):
+    """Invalid class name or type"""
+    def __init__(self, class_name):
+        self.class_name = class_name
+    def __str__(self):
+        return 'Invalid class name "%s"' % self.class_name
 
 class InvalidEfpDirectoryNameError(QlsError):
     """Invalid EFP directory name - should be NNNNk, where NNNN is a number (of any length)"""

Modified: qpid/branches/java-broker-bdb-ha/qpid/tools/src/py/qls/jrnl.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/tools/src/py/qls/jrnl.py?rev=1562466&r1=1562465&r2=1562466&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/tools/src/py/qls/jrnl.py (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/tools/src/py/qls/jrnl.py Wed Jan 29 14:48:50 2014
@@ -20,6 +20,7 @@
 import os
 import os.path
 import qls.err
+import string
 import struct
 from time import gmtime, strftime
 
@@ -46,25 +47,55 @@ class JournalRecoveryManager(object):
         self.journals = {}
         self.high_rid_counter = HighCounter()
     def report(self, print_stats_flag):
-        if not self.tpl is None:
+        if self.tpl is not None:
             self.tpl.report(print_stats_flag)
         for queue_name in sorted(self.journals.keys()):
             self.journals[queue_name].report(print_stats_flag)
     def run(self, args):
         tpl_dir = os.path.join(self.directory, JournalRecoveryManager.TPL_DIR_NAME)
         if os.path.exists(tpl_dir):
-            self.tpl = Journal(os.path.join(self.directory, tpl_dir), None)
+            self.tpl = Journal(tpl_dir, None)
             self.tpl.recover(self.high_rid_counter)
             print
         jrnl_dir = os.path.join(self.directory, JournalRecoveryManager.JRNL_DIR_NAME)
+        prepared_list = self.tpl.txn_map.get_prepared_list()
         if os.path.exists(jrnl_dir):
-            for dir_entry in os.listdir(jrnl_dir):
-                jrnl = Journal(os.path.join(jrnl_dir, dir_entry), self.tpl.get_outstanding_transaction_list())
+            for dir_entry in sorted(os.listdir(jrnl_dir)):
+                jrnl = Journal(os.path.join(jrnl_dir, dir_entry), prepared_list)
                 jrnl.recover(self.high_rid_counter)
                 self.journals[jrnl.get_queue_name()] = jrnl
-                if args.txn:
-                    jrnl.reconcile_transactions(self.high_rid_counter)
                 print
+        self._reconcile_transactions(prepared_list, args.txn)
+    def _reconcile_transactions(self, prepared_list, txn_flag):
+        print 'Transaction reconciliation report:'
+        print len(prepared_list), 'open transaction(s) found in prepared transaction list:'
+        for xid in prepared_list.keys():
+            commit_flag = prepared_list[xid]
+            if commit_flag is None:
+                status = '[Prepared, neither committed nor aborted - assuming commit]'
+            elif commit_flag:
+                status = '[Prepared, but interrupted during commit phase]'
+            else:
+                status = '[Prepared, but interrupted during abort phase]'
+            print ' ', Utils.format_xid(xid), status
+            if prepared_list[xid] is None: # Prepared, but not committed or aborted
+                enqueue_record = self.tpl.get_txn_map_record(xid)
+                dequeue_record = Utils.create_record('QLSd', DequeueRecord.TXN_COMPLETE_COMMIT_FLAG, \
+                                                     self.tpl.current_journal_file, self.high_rid_counter.get_next(), \
+                                                     enqueue_record.record_id, xid, None)
+                if txn_flag:
+                    self.tpl.add_record(dequeue_record)
+        for queue_name in sorted(self.journals.keys()):
+            self.journals[queue_name].reconcile_transactions(prepared_list, txn_flag)
+        if len(prepared_list) > 0:
+            print 'Completing prepared transactions in prepared transaction list:'
+        for xid in prepared_list.keys():
+            print ' ', Utils.format_xid(xid)
+            transaction_record = Utils.create_record('QLSc', 0, self.tpl.current_journal_file, \
+                                                     self.high_rid_counter.get_next(), None, xid, None)
+            if txn_flag:
+                self.tpl.add_record(transaction_record)
+        print
 
 class EnqueueMap(object):
     """
@@ -73,23 +104,29 @@ class EnqueueMap(object):
     def __init__(self, journal):
         self.journal = journal
         self.enq_map = {}
-    def add(self, file_header, enq_record, locked_flag):
+    def add(self, journal_file, enq_record, locked_flag):
         if enq_record.record_id in self.enq_map:
             raise qls.err.DuplicateRecordIdError(self.journal.current_file_header, enq_record)
-        self.enq_map[enq_record.record_id] = [file_header.file_num, enq_record, locked_flag]
+        self.enq_map[enq_record.record_id] = [journal_file, enq_record, locked_flag]
     def contains(self, rid):
         """Return True if the map contains the given rid"""
         return rid in self.enq_map
-    def delete(self, file_header, deq_record):
+    def delete(self, journal_file, deq_record):
         if deq_record.dequeue_record_id in self.enq_map:
+            enq_list = self.enq_map[deq_record.dequeue_record_id]
             del self.enq_map[deq_record.dequeue_record_id]
+            return enq_list
         else:
-            raise qls.err.RecordIdNotFoundError(file_header, deq_record)
-    def lock(self, file_header, dequeue_record):
-        if not dequeue_record.dequeue_record_id in self.enq_map:
-            raise qls.err.RecordIdNotFoundError(file_header, dequeue_record)
+            raise qls.err.RecordIdNotFoundError(journal_file.file_header, deq_record)
+    def get(self, record_id):
+        if record_id in self.enq_map:
+            return self.enq_map[record_id]
+        return None
+    def lock(self, journal_file, dequeue_record):
+        if dequeue_record.dequeue_record_id not in self.enq_map:
+            raise qls.err.RecordIdNotFoundError(journal_file.file_header, dequeue_record)
         self.enq_map[dequeue_record.dequeue_record_id][2] = True
-    def report_str(self, show_stats, show_records):
+    def report_str(self, _, show_records):
         """Return a string containing a text report for all records in the map"""
         if len(self.enq_map) == 0:
             return 'No enqueued records found.'
@@ -99,24 +136,24 @@ class EnqueueMap(object):
             rid_list = self.enq_map.keys()
             rid_list.sort()
             for rid in rid_list:
-                file_num, record, locked_flag = self.enq_map[rid]
+                journal_file, record, locked_flag = self.enq_map[rid]
                 if locked_flag:
                     lock_str = '[LOCKED]'
                 else:
                     lock_str = ''
-                rstr += '\n  %d:%s %s' % (file_num, record, lock_str)
+                rstr += '\n  %d:%s %s' % (journal_file.file_header.file_num, record, lock_str)
         else:
             rstr += '.'
         return rstr
-    def unlock(self, file_header, dequeue_record):
+    def unlock(self, journal_file, dequeue_record):
         """Set the transaction lock for a given record_id to False"""
         if dequeue_record.dequeue_record_id in self.enq_map:
             if self.enq_map[dequeue_record.dequeue_record_id][2]:
                 self.enq_map[dequeue_record.dequeue_record_id][2] = False
             else:
-                raise qls.err.RecordNotLockedError(file_header, dequeue_record)
+                raise qls.err.RecordNotLockedError(journal_file.file_header, dequeue_record)
         else:
-            raise qls.err.RecordIdNotFoundError(file_header, dequeue_record)
+            raise qls.err.RecordIdNotFoundError(journal_file.file_header, dequeue_record)
 
 class TransactionMap(object):
     """
@@ -127,38 +164,40 @@ class TransactionMap(object):
         self.enq_map = enq_map
     def abort(self, xid):
         """Perform an abort operation for the given xid record"""
-        for file_header, record, lock_flag in self.txn_map[xid]:
+        for journal_file, record, _ in self.txn_map[xid]:
             if isinstance(record, DequeueRecord):
                 if self.enq_map.contains(record.dequeue_record_id):
-                    self.enq_map.unlock(file_header, record)
+                    self.enq_map.unlock(journal_file, record)
+            else:
+                journal_file.decr_enq_cnt(record)
         del self.txn_map[xid]
-    def add(self, file_header, record):
+    def add(self, journal_file, record):
         if record.xid is None:
-            raise qls.err.NonTransactionalRecordError(file_header, record, 'TransactionMap.add()')
+            raise qls.err.NonTransactionalRecordError(journal_file.file_header, record, 'TransactionMap.add()')
         if isinstance(record, DequeueRecord):
             try:
-                self.enq_map.lock(file_header, record)
+                self.enq_map.lock(journal_file, record)
             except qls.err.RecordIdNotFoundError:
                 # Not in emap, look for rid in tmap - should not happen in practice
                 txn_op = self._find_record_id(record.xid, record.dequeue_record_id)
                 if txn_op != None:
                     if txn_op[2]:
-                        raise qls.err.AlreadyLockedError(file_header, record)
+                        raise qls.err.AlreadyLockedError(journal_file.file_header, record)
                     txn_op[2] = True
         if record.xid in self.txn_map:
-            self.txn_map[record.xid].append([file_header, record, False]) # append to existing list
+            self.txn_map[record.xid].append([journal_file, record, False]) # append to existing list
         else:
-            self.txn_map[record.xid] = [[file_header, record, False]] # create new list
+            self.txn_map[record.xid] = [[journal_file, record, False]] # create new list
     def commit(self, xid):
         """Perform a commit operation for the given xid record"""
         mismatch_list = []
-        for file_header, record, lock in self.txn_map[xid]:
+        for journal_file, record, lock in self.txn_map[xid]:
             if isinstance(record, EnqueueRecord):
-                self.enq_map.add(file_header, record, lock) # Transfer enq to emap
+                self.enq_map.add(journal_file, record, lock) # Transfer enq to emap
             else:
                 if self.enq_map.contains(record.dequeue_record_id):
-                    self.enq_map.unlock(file_header, record)
-                    self.enq_map.delete(file_header, record)
+                    self.enq_map.unlock(journal_file, record)
+                    self.enq_map.delete(journal_file, record)[0].decr_enq_cnt(record)
                 else:
                     mismatch_list.append('0x%x' % record.dequeue_record_id)
         del self.txn_map[xid]
@@ -166,27 +205,47 @@ class TransactionMap(object):
     def contains(self, xid):
         """Return True if the xid exists in the map; False otherwise"""
         return xid in self.txn_map
-    def delete(self, file_header, transaction_record):
+    def delete(self, journal_file, transaction_record):
         """Remove a transaction record from the map using either a commit or abort header"""
         if transaction_record.magic[-1] == 'c':
             return self.commit(transaction_record.xid)
         if transaction_record.magic[-1] == 'a':
             self.abort(transaction_record.xid)
         else:
-            raise qls.err.InvalidRecordTypeError(file_header, transaction_record, 'delete from Transaction Map')
+            raise qls.err.InvalidRecordTypeError(journal_file.file_header, transaction_record,
+                                                 'delete from Transaction Map')
+    def get(self, xid):
+        if xid in self.txn_map:
+            return self.txn_map[xid]
+        return None
+    def get_prepared_list(self):
+        """
+        Prepared list is a map of xid(key) to one of None, True or False. These represent respectively:
+        None: prepared, but neither committed or aborted (interrupted before commit or abort)
+        False: prepared and aborted (interrupted before abort complete)
+        True: prepared and committed (interrupted before commit complete)
+        """
+        prepared_list = {}
+        for xid in self.get_xid_list():
+            for _, record, _ in self.txn_map[xid]:
+                if isinstance(record, EnqueueRecord):
+                    prepared_list[xid] = None
+                else:
+                    prepared_list[xid] = record.is_transaction_complete_commit()
+        return prepared_list
     def get_xid_list(self):
         return self.txn_map.keys()
-    def report_str(self, show_stats, show_records):
+    def report_str(self, _, show_records):
         """Return a string containing a text report for all records in the map"""
         if len(self.txn_map) == 0:
             return 'No outstanding transactions found.'
         rstr = '%d outstanding transaction(s)' % len(self.txn_map)
         if show_records:
             rstr += ':'
-            for xid, list in self.txn_map.iteritems():
-                rstr += '\n  %s containing %d operations:' % (Utils.format_xid(xid), len(list))
-                for file_header, record, locked_flag in list:
-                    rstr += '\n    %d:%s' % (file_header.file_num, record)
+            for xid, op_list in self.txn_map.iteritems():
+                rstr += '\n  %s containing %d operations:' % (Utils.format_xid(xid), len(op_list))
+                for journal_file, record, _ in op_list:
+                    rstr += '\n    %d:%s' % (journal_file.file_header.file_num, record)
         else:
             rstr += '.'
         return rstr
@@ -202,48 +261,48 @@ class TransactionMap(object):
                     return txn_op
         return None
 
+class JournalStatistics(object):
+    """Journal statistics"""
+    def __init__(self):
+        self.total_record_count = 0
+        self.transient_record_count = 0
+        self.filler_record_count = 0
+        self.enqueue_count = 0
+        self.dequeue_count = 0
+        self.transaction_record_count = 0
+        self.transaction_enqueue_count = 0
+        self.transaction_dequeue_count = 0
+        self.transaction_commit_count = 0
+        self.transaction_abort_count = 0
+        self.transaction_operation_count = 0
+    def __str__(self):
+        fstr = 'Total record count: %d\n' + \
+               'Transient record count: %d\n' + \
+               'Filler_record_count: %d\n' + \
+               'Enqueue_count: %d\n' + \
+               'Dequeue_count: %d\n' + \
+               'Transaction_record_count: %d\n' + \
+               'Transaction_enqueue_count: %d\n' + \
+               'Transaction_dequeue_count: %d\n' + \
+               'Transaction_commit_count: %d\n' + \
+               'Transaction_abort_count: %d\n' + \
+               'Transaction_operation_count: %d\n'
+        return fstr % (self.total_record_count,
+                       self.transient_record_count,
+                       self.filler_record_count,
+                       self.enqueue_count,
+                       self.dequeue_count,
+                       self.transaction_record_count,
+                       self.transaction_enqueue_count,
+                       self.transaction_dequeue_count,
+                       self.transaction_commit_count,
+                       self.transaction_abort_count,
+                       self.transaction_operation_count)
+
 class Journal(object):
     """
     Instance of a Qpid Linear Store (QLS) journal.
     """
-    class JournalStatistics(object):
-        """Journal statistics"""
-        def __init__(self):
-            self.total_record_count = 0
-            self.transient_record_count = 0
-            self.filler_record_count = 0
-            self.enqueue_count = 0
-            self.dequeue_count = 0
-            self.transaction_record_count = 0
-            self.transaction_enqueue_count = 0
-            self.transaction_dequeue_count = 0
-            self.transaction_commit_count = 0
-            self.transaction_abort_count = 0
-            self.transaction_operation_count = 0
-        def __str__(self):
-            fstr = 'Total record count: %d\n' + \
-                   'Transient record count: %d\n' + \
-                   'Filler_record_count: %d\n' + \
-                   'Enqueue_count: %d\n' + \
-                   'Dequeue_count: %d\n' + \
-                   'Transaction_record_count: %d\n' + \
-                   'Transaction_enqueue_count: %d\n' + \
-                   'Transaction_dequeue_count: %d\n' + \
-                   'Transaction_commit_count: %d\n' + \
-                   'Transaction_abort_count: %d\n' + \
-                   'Transaction_operation_count: %d\n'
-            return fstr % (self.total_record_count,
-                           self.transient_record_count,
-                           self.filler_record_count,
-                           self.enqueue_count,
-                           self.dequeue_count,
-                           self.transaction_record_count,
-                           self.transaction_enqueue_count,
-                           self.transaction_dequeue_count,
-                           self.transaction_commit_count,
-                           self.transaction_abort_count,
-                           self.transaction_operation_count)
-
     def __init__(self, directory, xid_prepared_list):
         self.directory = directory
         self.queue_name = os.path.basename(directory)
@@ -252,12 +311,25 @@ class Journal(object):
         self.file_num_itr = None
         self.enq_map = EnqueueMap(self)
         self.txn_map = TransactionMap(self.enq_map)
-        self.current_file_header = None
+        self.current_journal_file = None
         self.first_rec_flag = None
-        self.statistics = Journal.JournalStatistics()
-        self.warnings = []
+        self.statistics = JournalStatistics()
         self.xid_prepared_list = xid_prepared_list # This is None for the TPL instance only
-    def get_outstanding_transaction_list(self):
+    def add_record(self, record):
+        if isinstance(record, EnqueueRecord) or isinstance(record, DequeueRecord):
+            if record.xid_size > 0:
+                self.txn_map.add(self.current_journal_file, record)
+            else:
+                self.enq_map.add(self.current_journal_file, record, False)
+        elif isinstance(record, TransactionRecord):
+            self.txn_map.delete(self.current_journal_file, record)
+        else:
+            raise qls.err.InvalidRecordTypeError(self.current_journal_file, record, 'add to Journal')
+    def get_enq_map_record(self, rid):
+        return self.enq_map.get(rid)
+    def get_txn_map_record(self, xid):
+        return self.txn_map.get(xid)
+    def get_outstanding_txn_list(self):
         return self.txn_map.get_xid_list()
     def get_queue_name(self):
         return self.queue_name
@@ -271,15 +343,28 @@ class Journal(object):
             #print '[No more files in journal]' # DEBUG
             #print #DEBUG
             pass
-    def reconcile_transactions(self, high_rid_counter):
-        if not self.xid_prepared_list is None: # ie This is not the TPL instance
-            print 'Reconcile outstanding prepared transactions:'
-            for xid in self.txn_map.get_xid_list():
-                if xid in self.xid_prepared_list:
-                    print '  Committing', Utils.format_xid(xid)
-                    self.txn_map.commit(xid)
+    def reconcile_transactions(self, prepared_list, txn_flag):
+        xid_list = self.txn_map.get_xid_list()
+        if len(xid_list) > 0:
+            print self.queue_name, 'contains', len(xid_list), 'open transaction(s):'
+        for xid in xid_list:
+            if xid in prepared_list.keys():
+                commit_flag = prepared_list[xid]
+                if commit_flag is None:
+                    print ' ', Utils.format_xid(xid), '- Assuming commit after prepare'
+                    if txn_flag:
+                        self.txn_map.commit(xid)
+                elif commit_flag:
+                    print ' ', Utils.format_xid(xid), '- Completing interrupted commit operation'
+                    if txn_flag:
+                        self.txn_map.commit(xid)
                 else:
-                    print '  Aborting', Utils.format_xid(xid)
+                    print ' ', Utils.format_xid(xid), '- Completing interrupted abort operation'
+                    if txn_flag:
+                        self.txn_map.abort(xid)
+            else:
+                print '  ', Utils.format_xid(xid), '- Aborting, not in prepared transaction list'
+                if txn_flag:
                     self.txn_map.abort(xid)
     def report(self, print_stats_flag):
         print 'Journal "%s":' % self.queue_name
@@ -287,13 +372,9 @@ class Journal(object):
             print str(self.statistics)
         print self.enq_map.report_str(True, True)
         print self.txn_map.report_str(True, True)
-        print 'file_num p_no   efp journal_file'
-        print '-------- ---- ----- ------------'
+        JournalFile.report_header()
         for file_num in sorted(self.files.keys()):
-            file_hdr = self.files[file_num]
-            comment = '<uninitialized>' if file_hdr.file_num == 0 else ''
-            print '%8d %4d %4dk %s %s' % (file_num, file_hdr.partition_num, file_hdr.efp_data_size_kb,
-                                          os.path.basename(file_hdr.file_handle.name), comment)
+            self.files[file_num].report()
         print
     #--- protected functions ---
     def _analyze_files(self):
@@ -305,22 +386,26 @@ class Journal(object):
                 args = Utils.load_args(file_handle, RecordHeader)
                 file_hdr = FileHeader(*args)
                 file_hdr.init(file_handle, *Utils.load_args(file_handle, FileHeader))
-                if not file_hdr.is_valid(file_hdr):
+                if not file_hdr.is_header_valid(file_hdr):
                     break
                 file_hdr.load(file_handle)
+                if not file_hdr.is_valid():
+                    break
                 Utils.skip(file_handle, file_hdr.file_header_size_sblks * Utils.SBLK_SIZE)
-                self.files[file_hdr.file_num] = file_hdr
+                self.files[file_hdr.file_num] = JournalFile(file_hdr)
         self.file_num_list = sorted(self.files.keys())
         self.file_num_itr = iter(self.file_num_list)
     def _check_file(self):
-        if not self.current_file_header is None and not self.current_file_header.is_end_of_file():
+        if self.current_journal_file is not None and not self.current_journal_file.file_header.is_end_of_file():
             return
         self._get_next_file()
-        self.current_file_header.file_handle.seek(self.current_file_header.first_record_offset)
+        fhdr = self.current_journal_file.file_header
+        fhdr.file_handle.seek(fhdr.first_record_offset)
     def _get_next_file(self):
-        if not self.current_file_header is None:
-            if not self.current_file_header.file_handle.closed: # sanity check, should not be necessary
-                self.current_file_header.file_handle.close()
+        if self.current_journal_file is not None:
+            file_handle = self.current_journal_file.file_header.file_handle
+            if not file_handle.closed: # sanity check, should not be necessary
+                file_handle.close()
         file_num = 0
         try:
             while file_num == 0:
@@ -329,18 +414,18 @@ class Journal(object):
             pass
         if file_num == 0:
             raise qls.err.NoMoreFilesInJournalError(self.queue_name)
-        self.current_file_header = self.files[file_num]
+        self.current_journal_file = self.files[file_num]
         self.first_rec_flag = True
-        print self.current_file_header
-        #print '[file_num=0x%x]' % self.current_file_header.file_num #DEBUG
+        print self.current_journal_file.file_header
+        #print '[file_num=0x%x]' % self.current_journal_file.file_num #DEBUG
     def _get_next_record(self, high_rid_counter):
         self._check_file()
-        this_record = Utils.load(self.current_file_header.file_handle, RecordHeader)
-        if not this_record.is_valid(self.current_file_header):
+        this_record = Utils.load(self.current_journal_file.file_header.file_handle, RecordHeader)
+        if not this_record.is_header_valid(self.current_journal_file.file_header):
             return False
         if self.first_rec_flag:
-            if this_record.file_offset != self.current_file_header.first_record_offset:
-                raise qls.err.FirstRecordOffsetMismatchError(self.current_file_header, this_record)
+            if this_record.file_offset != self.current_journal_file.file_header.first_record_offset:
+                raise qls.err.FirstRecordOffsetMismatchError(self.current_journal_file.file_header, this_record)
             self.first_rec_flag = False
         high_rid_counter.check(this_record.record_id)
         self.statistics.total_record_count += 1
@@ -355,65 +440,96 @@ class Journal(object):
             print this_record
         else:
             self.statistics.filler_record_count += 1
-        Utils.skip(self.current_file_header.file_handle, Utils.DBLK_SIZE)
+        Utils.skip(self.current_journal_file.file_header.file_handle, Utils.DBLK_SIZE)
         return True
     def _handle_enqueue_record(self, enqueue_record):
-        start_file_header = self.current_file_header
-        while enqueue_record.load(self.current_file_header.file_handle):
+        start_journal_file = self.current_journal_file
+        while enqueue_record.load(self.current_journal_file.file_header.file_handle):
             self._get_next_file()
+        if not enqueue_record.is_valid(self.current_journal_file):
+            return
         if enqueue_record.is_external() and enqueue_record.data != None:
-            raise qls.err.ExternalDataError(self.current_file_header, enqueue_record)
+            raise qls.err.ExternalDataError(self.current_journal_file.file_header, enqueue_record)
         if enqueue_record.is_transient():
             self.statistics.transient_record_count += 1
             return
         if enqueue_record.xid_size > 0:
-            self.txn_map.add(start_file_header, enqueue_record)
+            self.txn_map.add(start_journal_file, enqueue_record)
             self.statistics.transaction_operation_count += 1
             self.statistics.transaction_record_count += 1
             self.statistics.transaction_enqueue_count += 1
         else:
-            self.enq_map.add(start_file_header, enqueue_record, False)
+            self.enq_map.add(start_journal_file, enqueue_record, False)
+        start_journal_file.incr_enq_cnt()
         self.statistics.enqueue_count += 1
         #print enqueue_record, # DEBUG
     def _handle_dequeue_record(self, dequeue_record):
-        while dequeue_record.load(self.current_file_header.file_handle):
+        start_journal_file = self.current_journal_file
+        while dequeue_record.load(self.current_journal_file.file_header.file_handle):
             self._get_next_file()
+        if not dequeue_record.is_valid(self.current_journal_file):
+            return
         if dequeue_record.xid_size > 0:
             if self.xid_prepared_list is None: # ie this is the TPL
                 dequeue_record.transaction_prepared_list_flag = True
-            self.txn_map.add(self.current_file_header, dequeue_record)
+            elif not self.enq_map.contains(dequeue_record.dequeue_record_id):
+                dequeue_record.warnings.append('NOT IN EMAP') # Only for non-TPL records
+            self.txn_map.add(start_journal_file, dequeue_record)
             self.statistics.transaction_operation_count += 1
             self.statistics.transaction_record_count += 1
             self.statistics.transaction_dequeue_count += 1
         else:
             try:
-                self.enq_map.delete(self.current_file_header, dequeue_record)
+                self.enq_map.delete(start_journal_file, dequeue_record)[0].decr_enq_cnt(dequeue_record)
             except qls.err.RecordIdNotFoundError:
-                pass # TODO: handle missing enqueue warning here
+                dequeue_record.warnings.append('NOT IN EMAP')
         self.statistics.dequeue_count += 1
         #print dequeue_record, # DEBUG
     def _handle_transaction_record(self, transaction_record):
-        while transaction_record.load(self.current_file_header.file_handle):
+        while transaction_record.load(self.current_journal_file.file_header.file_handle):
             self._get_next_file()
+        if not transaction_record.is_valid(self.current_journal_file):
+            return
         if transaction_record.magic[-1] == 'a':
             self.statistics.transaction_abort_count += 1
         else:
             self.statistics.transaction_commit_count += 1
         if self.txn_map.contains(transaction_record.xid):
-            mismatched_rids = self.txn_map.delete(self.current_file_header, transaction_record)
-            if mismatched_rids != None and len(mismatched_rids) > 0:
-                self.warnings.append('WARNING: transactional dequeues not found in enqueue map; rids=%s' %
-                                     mismatched_rids)
+            self.txn_map.delete(self.current_journal_file, transaction_record)
         else:
-            self.warnings.append('WARNING: %s not found in transaction map' % \
-                                 Utils.format_xid(transaction_record.xid))
+            transaction_record.warnings.append('NOT IN TMAP')
 #        if transaction_record.magic[-1] == 'c': # commits only
 #            self._txn_obj_list[hdr.xid] = hdr
         self.statistics.transaction_record_count += 1
         #print transaction_record, # DEBUG
     def _load_data(self, record):
         while not record.is_complete:
-            record.load(self.current_file_header.file_handle)
+            record.load(self.current_journal_file.file_handle)
+
+class JournalFile(object):
+    def __init__(self, file_header):
+        self.file_header = file_header
+        self.enq_cnt = 0
+        self.deq_cnt = 0
+    def incr_enq_cnt(self):
+        self.enq_cnt += 1
+    def decr_enq_cnt(self, record):
+        if self.enq_cnt <= self.deq_cnt:
+            raise qls.err.EnqueueCountUnderflowError(self.file_header, record)
+        self.deq_cnt += 1
+    def get_enq_cnt(self):
+        return self.enq_cnt - self.deq_cnt
+    def is_outstanding_enq(self):
+        return self.enq_cnt > self.deq_cnt
+    @staticmethod
+    def report_header():
+        print 'file_num enq_cnt p_no   efp journal_file'
+        print '-------- ------- ---- ----- ------------'
+    def report(self):
+        comment = '<uninitialized>' if self.file_header.file_num == 0 else ''
+        print '%8d %7d %4d %4dk %s %s' % (self.file_header.file_num, self.get_enq_cnt(), self.file_header.partition_num,
+                                          self.file_header.efp_data_size_kb,
+                                          os.path.basename(self.file_header.file_handle.name), comment)
 
 class RecordHeader(object):
     FORMAT = '<4s2H2Q'
@@ -424,6 +540,7 @@ class RecordHeader(object):
         self.user_flags = user_flags
         self.serial = serial
         self.record_id = record_id
+        self.warnings = []
     def load(self, file_handle):
         pass
     @staticmethod
@@ -433,7 +550,7 @@ class RecordHeader(object):
     def is_empty(self):
         """Return True if this record is empty (ie has a magic of 0x0000"""
         return self.magic == '\x00'*4
-    def is_valid(self, file_header):
+    def is_header_valid(self, file_header):
         """Check that this record is valid"""
         if self.is_empty():
             return False
@@ -447,6 +564,11 @@ class RecordHeader(object):
                 #print #DEBUG
                 return False
         return True
+    def _get_warnings(self):
+        warn_str = ''
+        for warn in self.warnings:
+            warn_str += '<%s>' % warn
+        return warn_str
     def __str__(self):
         """Return string representation of this header"""
         if self.is_empty():
@@ -460,12 +582,13 @@ class RecordHeader(object):
 
 class RecordTail(object):
     FORMAT = '<4sL2Q'
-    def __init__(self, file_handle):
-        self.file_offset = file_handle.tell()
+    def __init__(self, file_handle): # TODO - clumsy, only allows reading from disk. Move all disk stuff to laod()
+        self.file_offset = file_handle.tell() if file_handle is not None else 0
         self.complete = False
         self.read_size = struct.calcsize(RecordTail.FORMAT)
-        self.fbin = file_handle.read(self.read_size)
-        if len(self.fbin) >= self.read_size:
+        self.fbin = file_handle.read(self.read_size) if file_handle is not None else None
+        self.valid_flag = None
+        if self.fbin is not None and len(self.fbin) >= self.read_size:
             self.complete = True
             self.xmagic, self.checksum, self.serial, self.record_id = struct.unpack(RecordTail.FORMAT, self.fbin)
     def load(self, file_handle):
@@ -477,14 +600,27 @@ class RecordTail(object):
                 self.xmagic, self.checksum, self.serial, self.record_id = struct.unpack(RecordTail.FORMAT, self.fbin)
     def is_complete(self):
         return self.complete
+    def is_valid(self, record):
+        if self.valid_flag is None:
+            if not self.complete:
+                return False
+            self.valid_flag = Utils.inv_str(self.xmagic) == record.magic and \
+                              self.serial == record.serial and \
+                              self.record_id == record.record_id
+            # TODO: When we can verify the checksum, add this here
+        return self.valid_flag
     def __str__(self):
         """Return a string representation of the this RecordTail instance"""
+        if self.valid_flag is not None:
+            if not self.valid_flag:
+                return '[INVALID RECORD TAIL]'
         magic = Utils.inv_str(self.xmagic)
-        return '[%c cs=0x%x rid=0x%x]' % (magic[-1].upper(), self.checksum, self.record_id)
+        magic_char = magic[-1].upper() if magic[-1] in string.printable else '?'
+        return '[%c cs=0x%08x rid=0x%x]' % (magic_char, self.checksum, self.record_id)
 
 class FileHeader(RecordHeader):
     FORMAT = '<2H4x5QH'
-    def init(self, file_handle, file_offset, file_header_size_sblks, partition_num, efp_data_size_kb,
+    def init(self, file_handle, _, file_header_size_sblks, partition_num, efp_data_size_kb,
              first_record_offset, timestamp_sec, timestamp_ns, file_num, queue_name_len):
         self.file_handle = file_handle
         self.file_header_size_sblks = file_header_size_sblks
@@ -503,6 +639,20 @@ class FileHeader(RecordHeader):
         return (self.file_header_size_sblks * Utils.SBLK_SIZE) + (self.efp_data_size_kb * 1024)
     def is_end_of_file(self):
         return self.file_handle.tell() >= self.get_file_size()
+    def is_valid(self):
+        if not RecordHeader.is_header_valid(self, self):
+            return False
+        if self.file_handle is None or self.file_header_size_sblks == 0 or self.partition_num == 0 or \
+           self.efp_data_size_kb == 0 or self.first_record_offset == 0 or self.timestamp_sec == 0 or \
+           self.timestamp_ns == 0 or self.file_num == 0:
+            return False
+        if self.queue_name_len == 0:
+            return False
+        if self.queue_name is None:
+            return False
+        if len(self.queue_name) != self.queue_name_len:
+            return False
+        return True
     def timestamp_str(self):
         """Get the timestamp of this record in string format"""
         time = gmtime(self.timestamp_sec)
@@ -510,14 +660,16 @@ class FileHeader(RecordHeader):
         return strftime(fstr, time)
     def __str__(self):
         """Return a string representation of the this FileHeader instance"""
-        return '%s fnum=%d fro=0x%08x p=%d s=%dk t=%s' % (RecordHeader.__str__(self), self.file_num, self.first_record_offset,
-                                                          self.partition_num, self.efp_data_size_kb, self.timestamp_str())
+        return '%s fnum=%d fro=0x%08x p=%d s=%dk t=%s %s' % (RecordHeader.__str__(self), self.file_num,
+                                                             self.first_record_offset, self.partition_num,
+                                                             self.efp_data_size_kb, self.timestamp_str(),
+                                                             self._get_warnings())
 
 class EnqueueRecord(RecordHeader):
     FORMAT = '<2Q'
     EXTERNAL_FLAG_MASK = 0x20
     TRANSIENT_FLAG_MASK = 0x10
-    def init(self, file_offset, xid_size, data_size):
+    def init(self, _, xid_size, data_size):
         self.xid_size = xid_size
         self.data_size = data_size
         self.xid = None
@@ -529,6 +681,21 @@ class EnqueueRecord(RecordHeader):
         return self.user_flags & EnqueueRecord.EXTERNAL_FLAG_MASK > 0
     def is_transient(self):
         return self.user_flags & EnqueueRecord.TRANSIENT_FLAG_MASK > 0
+    def is_valid(self, journal_file):
+        if not RecordHeader.is_header_valid(self, journal_file.file_header):
+            return False
+        if not (self.xid_complete and self.data_complete):
+            return False
+        if self.xid_size > 0 and len(self.xid) != self.xid_size:
+            return False
+        if self.data_size > 0 and len(self.data) != self.data_size:
+            return False
+        if self.xid_size > 0 or self.data_size > 0:
+            if self.record_tail is None:
+                return False
+            if not self.record_tail.is_valid(self):
+                return False
+        return True
     def load(self, file_handle):
         """Return True when load is incomplete and must be called again with new file handle"""
         self.xid, self.xid_complete = Utils.load_data(file_handle, self.xid, self.xid_size)
@@ -545,7 +712,10 @@ class EnqueueRecord(RecordHeader):
                 self.record_tail = RecordTail(file_handle)
             elif not self.record_tail.is_complete():
                 self.record_tail.load(file_handle) # Continue loading partially loaded tail
-            return not self.record_tail.is_complete()
+            if self.record_tail.is_complete():
+                self.record_tail.is_valid(self)
+            else:
+                return True
         return False
     def _print_flags(self):
         """Utility function to decode the flags field in the header and print a string representation"""
@@ -566,13 +736,14 @@ class EnqueueRecord(RecordHeader):
             record_tail_str = ''
         else:
             record_tail_str = str(self.record_tail)
-        return '%s %s %s %s %s' % (RecordHeader.__str__(self), Utils.format_xid(self.xid, self.xid_size),
-                                   Utils.format_data(self.data_size, self.data), record_tail_str, self._print_flags())
+        return '%s %s %s %s %s %s' % (RecordHeader.__str__(self), Utils.format_xid(self.xid, self.xid_size),
+                                      Utils.format_data(self.data_size, self.data), record_tail_str,
+                                      self._print_flags(), self._get_warnings())
 
 class DequeueRecord(RecordHeader):
     FORMAT = '<2Q'
     TXN_COMPLETE_COMMIT_FLAG = 0x10
-    def init(self, file_offset, dequeue_record_id, xid_size):
+    def init(self, _, dequeue_record_id, xid_size):
         self.dequeue_record_id = dequeue_record_id
         self.xid_size = xid_size
         self.transaction_prepared_list_flag = False
@@ -581,6 +752,19 @@ class DequeueRecord(RecordHeader):
         self.record_tail = None
     def is_transaction_complete_commit(self):
         return self.user_flags & DequeueRecord.TXN_COMPLETE_COMMIT_FLAG > 0
+    def is_valid(self, journal_file):
+        if not RecordHeader.is_header_valid(self, journal_file.file_header):
+            return False
+        if self.xid_size > 0:
+            if not self.xid_complete:
+                return False
+            if self.xid_size > 0 and len(self.xid) != self.xid_size:
+                return False
+            if self.record_tail is None:
+                return False
+            if not self.record_tail.is_valid(self):
+                return False
+        return True
     def load(self, file_handle):
         """Return True when load is incomplete and must be called again with new file handle"""
         self.xid, self.xid_complete = Utils.load_data(file_handle, self.xid, self.xid_size)
@@ -591,7 +775,10 @@ class DequeueRecord(RecordHeader):
                 self.record_tail = RecordTail(file_handle)
             elif not self.record_tail.is_complete():
                 self.record_tail.load(file_handle)
-            return not self.record_tail.is_complete()
+            if self.record_tail.is_complete():
+                self.record_tail.is_valid(self)
+            else:
+                return True
         return False
     def _print_flags(self):
         """Utility function to decode the flags field in the header and print a string representation"""
@@ -607,16 +794,27 @@ class DequeueRecord(RecordHeader):
             record_tail_str = ''
         else:
             record_tail_str = str(self.record_tail)
-        return '%s %s drid=0x%x %s %s' % (RecordHeader.__str__(self), Utils.format_xid(self.xid, self.xid_size),
-                                          self.dequeue_record_id, record_tail_str, self._print_flags())
+        return '%s %s drid=0x%x %s %s %s' % (RecordHeader.__str__(self), Utils.format_xid(self.xid, self.xid_size),
+                                             self.dequeue_record_id, record_tail_str, self._print_flags(),
+                                             self._get_warnings())
 
 class TransactionRecord(RecordHeader):
     FORMAT = '<Q'
-    def init(self, file_offset, xid_size):
+    def init(self, _, xid_size):
         self.xid_size = xid_size
         self.xid = None
         self.xid_complete = False
         self.record_tail = None
+    def is_valid(self, journal_file):
+        if not RecordHeader.is_header_valid(self, journal_file.file_header):
+            return False
+        if not self.xid_complete or len(self.xid) != self.xid_size:
+            return False
+        if self.record_tail is None:
+            return False
+        if not self.record_tail.is_valid(self):
+            return False
+        return True
     def load(self, file_handle):
         """Return True when load is incomplete and must be called again with new file handle"""
         self.xid, self.xid_complete = Utils.load_data(file_handle, self.xid, self.xid_size)
@@ -627,7 +825,10 @@ class TransactionRecord(RecordHeader):
                 self.record_tail = RecordTail(file_handle)
             elif not self.record_tail.is_complete():
                 self.record_tail.load(file_handle)
-            return not self.record_tail.is_complete()
+            if self.record_tail.is_complete():
+                self.record_tail.is_valid(self)
+            else:
+                return True
         return False
     def __str__(self):
         """Return a string representation of the this TransactionRecord instance"""
@@ -635,14 +836,40 @@ class TransactionRecord(RecordHeader):
             record_tail_str = ''
         else:
             record_tail_str = str(self.record_tail)
-        return '%s %s %s' % (RecordHeader.__str__(self), Utils.format_xid(self.xid, self.xid_size), record_tail_str)
+        return '%s %s %s %s' % (RecordHeader.__str__(self), Utils.format_xid(self.xid, self.xid_size), record_tail_str,
+                                self._get_warnings())
 
 class Utils(object):
     """Class containing utility functions for dealing with the journal"""
     DBLK_SIZE = 128
     RECORD_VERSION = 2
     SBLK_SIZE = 4096
-    __printchars = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~ '
+    @staticmethod
+    def create_record(magic, uflags, journal_file, record_id, dequeue_record_id, xid, data):
+        record_class = _CLASSES.get(magic[-1])
+        record = record_class(0, magic, Utils.RECORD_VERSION, uflags, journal_file.file_header.serial, record_id)
+        xid_length = len(xid) if xid is not None else 0
+        if isinstance(record, EnqueueRecord):
+            data_length = len(data) if data is not None else 0
+            record.init(None, xid_length, data_length)
+        elif isinstance(record, DequeueRecord):
+            record.init(None, dequeue_record_id, xid_length)
+        elif isinstance(record, TransactionRecord):
+            record.init(None, xid_length)
+        else:
+            raise qls.err.InvalidClassError(record.__class__.__name__)
+        if xid is not None:
+            record.xid = xid
+            record.xid_complete = True
+        if data is not None:
+            record.data = data
+            record.data_complete = True
+        record.record_tail = RecordTail(None)
+        record.record_tail.xmagic = Utils.inv_str(magic)
+        record.record_tail.checksum = 0 # TODO: when we can calculate checksums, add this here
+        record.record_tail.serial = record.serial
+        record.record_tail.record_id = record.record_id
+        return record
     @staticmethod
     def format_data(dsize, data):
         """Format binary data for printing"""
@@ -727,7 +954,7 @@ class Utils(object):
                 hstr += '\\%02x' % ord(in_str[index])
         return hstr
     @staticmethod
-    def _hex_split_str(in_str, split_size = 50):
+    def _hex_split_str(in_str):#, split_size = 50):
         """Split a hex string into two parts separated by an ellipsis"""
 #        if len(in_str) <= split_size:
 #            return Utils._hex_str(in_str, 0, len(in_str))
@@ -736,7 +963,10 @@ class Utils(object):
     @staticmethod
     def _is_printable(in_str):
         """Return True if in_str in printable; False otherwise."""
-        return in_str.strip(Utils.__printchars) == ''
+        for this_char in in_str:
+            if this_char not in string.printable:
+                return False
+        return True
     @staticmethod
     def _rem_bytes_in_block(file_handle, block_size):
         """Return the remaining bytes in a block"""

Modified: qpid/branches/java-broker-bdb-ha/qpid/tools/src/py/qpid_qls_analyze.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/tools/src/py/qpid_qls_analyze.py?rev=1562466&r1=1562465&r2=1562466&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/tools/src/py/qpid_qls_analyze.py (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/tools/src/py/qpid_qls_analyze.py Wed Jan 29 14:48:50 2014
@@ -36,8 +36,9 @@ class QqpdLinearStoreAnalyzer(object):
     def __init__(self):
         self.args = None
         self._process_args()
-        self.efp_manager = efp.EfpManager(self.args.qls_dir)
-        self.jrnl_recovery_mgr = jrnl.JournalRecoveryManager(self.args.qls_dir)
+        self.qls_dir = os.path.abspath(self.args.qls_dir)
+        self.efp_manager = efp.EfpManager(self.qls_dir)
+        self.jrnl_recovery_mgr = jrnl.JournalRecoveryManager(self.qls_dir)
     def _analyze_efp(self):
         self.efp_manager.run(self.args)
     def _analyze_journals(self):



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org