You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2014/02/05 16:34:43 UTC

svn commit: r1564808 - in /qpid/trunk/qpid/tools/src/py: qls/efp.py qls/err.py qls/jrnl.py qpid_qls_analyze.py

Author: kpvdr
Date: Wed Feb  5 15:34:42 2014
New Revision: 1564808

URL: http://svn.apache.org/r1564808
Log:
QPID-5362: Bugfixes and enhancements for qpid_qls_analyze

Modified:
    qpid/trunk/qpid/tools/src/py/qls/efp.py
    qpid/trunk/qpid/tools/src/py/qls/err.py
    qpid/trunk/qpid/tools/src/py/qls/jrnl.py
    qpid/trunk/qpid/tools/src/py/qpid_qls_analyze.py

Modified: qpid/trunk/qpid/tools/src/py/qls/efp.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qls/efp.py?rev=1564808&r1=1564807&r2=1564808&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qls/efp.py (original)
+++ qpid/trunk/qpid/tools/src/py/qls/efp.py Wed Feb  5 15:34:42 2014
@@ -26,10 +26,11 @@ class EfpManager(object):
     Top level class to analyze the Qpid Linear Store (QLS) directory for the partitions that make up the
     Empty File Pool (EFP).
     """
-    def __init__(self, directory):
+    def __init__(self, directory, args):
         if not os.path.exists(directory):
             raise qls.err.InvalidQlsDirectoryNameError(directory)
         self.directory = directory
+        self.args = args
         self.partitions = []
     def report(self):
         print 'Found', len(self.partitions), 'partition(s).'

Modified: qpid/trunk/qpid/tools/src/py/qls/err.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qls/err.py?rev=1564808&r1=1564807&r2=1564808&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qls/err.py (original)
+++ qpid/trunk/qpid/tools/src/py/qls/err.py Wed Feb  5 15:34:42 2014
@@ -30,6 +30,16 @@ class QlsRecordError(QlsError):
         QlsError.__init__(self)
         self.file_header = file_header
         self.record = record
+    def get_expected_fro(self):
+        return self.file_header.first_record_offset
+    def get_file_number(self):
+        return self.file_header.file_num
+    def get_queue_name(self):
+        return self.file_header.queue_name
+    def get_record_id(self):
+        return self.record.record_id
+    def get_record_offset(self):
+        return self.record.file_offset
     def __str__(self):
         return 'queue="%s" file_id=0x%x record_offset=0x%x record_id=0x%x' % \
             (self.file_header.queue_name, self.file_header.file_num, self.record.file_offset, self.record.record_id)

Modified: qpid/trunk/qpid/tools/src/py/qls/jrnl.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qls/jrnl.py?rev=1564808&r1=1564807&r2=1564808&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qls/jrnl.py (original)
+++ qpid/trunk/qpid/tools/src/py/qls/jrnl.py Wed Feb  5 15:34:42 2014
@@ -23,6 +23,7 @@ import qls.err
 import string
 import struct
 from time import gmtime, strftime
+import zlib
 
 class HighCounter(object):
     def __init__(self):
@@ -39,10 +40,11 @@ class HighCounter(object):
 class JournalRecoveryManager(object):
     TPL_DIR_NAME = 'tpl'
     JRNL_DIR_NAME = 'jrnl'
-    def __init__(self, directory):
+    def __init__(self, directory, args):
         if not os.path.exists(directory):
             raise qls.err.InvalidQlsDirectoryNameError(directory)
         self.directory = directory
+        self.args = args
         self.tpl = None
         self.journals = {}
         self.high_rid_counter = HighCounter()
@@ -54,20 +56,21 @@ class JournalRecoveryManager(object):
     def run(self, args):
         tpl_dir = os.path.join(self.directory, JournalRecoveryManager.TPL_DIR_NAME)
         if os.path.exists(tpl_dir):
-            self.tpl = Journal(tpl_dir, None)
+            self.tpl = Journal(tpl_dir, None, self.args)
             self.tpl.recover(self.high_rid_counter)
             print
         jrnl_dir = os.path.join(self.directory, JournalRecoveryManager.JRNL_DIR_NAME)
-        prepared_list = self.tpl.txn_map.get_prepared_list()
+        prepared_list = self.tpl.txn_map.get_prepared_list() if self.tpl is not None else {}
         if os.path.exists(jrnl_dir):
             for dir_entry in sorted(os.listdir(jrnl_dir)):
-                jrnl = Journal(os.path.join(jrnl_dir, dir_entry), prepared_list)
+                jrnl = Journal(os.path.join(jrnl_dir, dir_entry), prepared_list, self.args)
                 jrnl.recover(self.high_rid_counter)
                 self.journals[jrnl.get_queue_name()] = jrnl
                 print
         self._reconcile_transactions(prepared_list, args.txn)
     def _reconcile_transactions(self, prepared_list, txn_flag):
         print 'Transaction reconciliation report:'
+        print '=================================='
         print len(prepared_list), 'open transaction(s) found in prepared transaction list:'
         for xid in prepared_list.keys():
             commit_flag = prepared_list[xid]
@@ -79,7 +82,7 @@ class JournalRecoveryManager(object):
                 status = '[Prepared, but interrupted during abort phase]'
             print ' ', Utils.format_xid(xid), status
             if prepared_list[xid] is None: # Prepared, but not committed or aborted
-                enqueue_record = self.tpl.get_txn_map_record(xid)
+                enqueue_record = self.tpl.get_txn_map_record(xid)[0][1]
                 dequeue_record = Utils.create_record('QLSd', DequeueRecord.TXN_COMPLETE_COMMIT_FLAG, \
                                                      self.tpl.current_journal_file, self.high_rid_counter.get_next(), \
                                                      enqueue_record.record_id, xid, None)
@@ -141,7 +144,7 @@ class EnqueueMap(object):
                     lock_str = '[LOCKED]'
                 else:
                     lock_str = ''
-                rstr += '\n  %d:%s %s' % (journal_file.file_header.file_num, record, lock_str)
+                rstr += '\n  0x%x:%s %s' % (journal_file.file_header.file_num, record, lock_str)
         else:
             rstr += '.'
         return rstr
@@ -245,7 +248,7 @@ class TransactionMap(object):
             for xid, op_list in self.txn_map.iteritems():
                 rstr += '\n  %s containing %d operations:' % (Utils.format_xid(xid), len(op_list))
                 for journal_file, record, _ in op_list:
-                    rstr += '\n    %d:%s' % (journal_file.file_header.file_num, record)
+                    rstr += '\n    0x%x:%s' % (journal_file.file_header.file_num, record)
         else:
             rstr += '.'
         return rstr
@@ -303,7 +306,7 @@ class Journal(object):
     """
     Instance of a Qpid Linear Store (QLS) journal.
     """
-    def __init__(self, directory, xid_prepared_list):
+    def __init__(self, directory, xid_prepared_list, args):
         self.directory = directory
         self.queue_name = os.path.basename(directory)
         self.files = {}
@@ -315,6 +318,10 @@ class Journal(object):
         self.first_rec_flag = None
         self.statistics = JournalStatistics()
         self.xid_prepared_list = xid_prepared_list # This is None for the TPL instance only
+        self.args = args
+        self.last_record_offset = None # TODO: Move into JournalFile
+        self.num_filler_records_required = None # TODO: Move into JournalFile
+        self.fill_to_offset = None
     def add_record(self, record):
         if isinstance(record, EnqueueRecord) or isinstance(record, DequeueRecord):
             if record.xid_size > 0:
@@ -334,15 +341,18 @@ class Journal(object):
     def get_queue_name(self):
         return self.queue_name
     def recover(self, high_rid_counter):
-        print 'Recovering', self.queue_name #DEBUG
+        print 'Recovering %s' % self.queue_name
         self._analyze_files()
         try:
             while self._get_next_record(high_rid_counter):
                 pass
+            self._check_alignment()
         except qls.err.NoMoreFilesInJournalError:
-            #print '[No more files in journal]' # DEBUG
-            #print #DEBUG
-            pass
+            print 'No more files in journal'
+        except qls.err.FirstRecordOffsetMismatchError as err:
+            print '0x%08x: **** FRO ERROR: queue=\"%s\" fid=0x%x fro actual=0x%08x expected=0x%08x' % \
+            (err.get_expected_fro(), err.get_queue_name(), err.get_file_number(), err.get_record_offset(),
+             err.get_expected_fro())
     def reconcile_transactions(self, prepared_list, txn_flag):
         xid_list = self.txn_map.get_xid_list()
         if len(xid_list) > 0:
@@ -363,11 +373,12 @@ class Journal(object):
                     if txn_flag:
                         self.txn_map.abort(xid)
             else:
-                print '  ', Utils.format_xid(xid), '- Aborting, not in prepared transaction list'
+                print '  ', Utils.format_xid(xid), '- Ignoring, not in prepared transaction list'
                 if txn_flag:
                     self.txn_map.abort(xid)
     def report(self, print_stats_flag):
         print 'Journal "%s":' % self.queue_name
+        print '=' * (11 + len(self.queue_name))
         if print_stats_flag:
             print str(self.statistics)
         print self.enq_map.report_str(True, True)
@@ -375,6 +386,11 @@ class Journal(object):
         JournalFile.report_header()
         for file_num in sorted(self.files.keys()):
             self.files[file_num].report()
+        #TODO: move this to JournalFile, append to file info
+        if self.num_filler_records_required is not None and self.fill_to_offset is not None:
+            print '0x%x:0x%08x: %d filler records required for DBLK alignment to 0x%08x' % \
+                  (self.current_journal_file.file_header.file_num, self.last_record_offset,
+                   self.num_filler_records_required, self.fill_to_offset)
         print
     #--- protected functions ---
     def _analyze_files(self):
@@ -386,21 +402,35 @@ class Journal(object):
                 args = Utils.load_args(file_handle, RecordHeader)
                 file_hdr = FileHeader(*args)
                 file_hdr.init(file_handle, *Utils.load_args(file_handle, FileHeader))
-                if not file_hdr.is_header_valid(file_hdr):
-                    break
-                file_hdr.load(file_handle)
-                if not file_hdr.is_valid():
-                    break
-                Utils.skip(file_handle, file_hdr.file_header_size_sblks * Utils.SBLK_SIZE)
-                self.files[file_hdr.file_num] = JournalFile(file_hdr)
+                if file_hdr.is_header_valid(file_hdr):
+                    file_hdr.load(file_handle)
+                    if file_hdr.is_valid():
+                        Utils.skip(file_handle, file_hdr.file_header_size_sblks * Utils.SBLK_SIZE)
+                        self.files[file_hdr.file_num] = JournalFile(file_hdr)
         self.file_num_list = sorted(self.files.keys())
         self.file_num_itr = iter(self.file_num_list)
+    def _check_alignment(self): # TODO: Move into JournalFile
+        remaining_sblks = self.last_record_offset % Utils.SBLK_SIZE
+        if remaining_sblks == 0:
+            self.num_filler_records_required = 0
+        else:
+            self.num_filler_records_required = (Utils.SBLK_SIZE - remaining_sblks) / Utils.DBLK_SIZE
+            self.fill_to_offset = self.last_record_offset + (self.num_filler_records_required * Utils.DBLK_SIZE)
+            if self.args.show_recs or self.args.show_all_recs:
+                print '0x%x:0x%08x: %d filler records required for DBLK alignment to 0x%08x' % \
+                      (self.current_journal_file.file_header.file_num, self.last_record_offset,
+                       self.num_filler_records_required, self.fill_to_offset)
     def _check_file(self):
-        if self.current_journal_file is not None and not self.current_journal_file.file_header.is_end_of_file():
-            return
-        self._get_next_file()
+        if self.current_journal_file is not None:
+            if not self.current_journal_file.file_header.is_end_of_file():
+                return True
+            if self.current_journal_file.file_header.is_end_of_file():
+                self.last_record_offset = self.current_journal_file.file_header.file_handle.tell()
+        if not self._get_next_file():
+            return False
         fhdr = self.current_journal_file.file_header
         fhdr.file_handle.seek(fhdr.first_record_offset)
+        return True
     def _get_next_file(self):
         if self.current_journal_file is not None:
             file_handle = self.current_journal_file.file_header.file_handle
@@ -413,13 +443,16 @@ class Journal(object):
         except StopIteration:
             pass
         if file_num == 0:
-            raise qls.err.NoMoreFilesInJournalError(self.queue_name)
+            return False
         self.current_journal_file = self.files[file_num]
         self.first_rec_flag = True
-        print self.current_journal_file.file_header
-        #print '[file_num=0x%x]' % self.current_journal_file.file_num #DEBUG
+        if self.args.show_recs or self.args.show_all_recs:
+            print '0x%x:%s' % (self.current_journal_file.file_header.file_num, self.current_journal_file.file_header)
+        return True
     def _get_next_record(self, high_rid_counter):
-        self._check_file()
+        if not self._check_file():
+            return False
+        self.last_record_offset = self.current_journal_file.file_header.file_handle.tell()
         this_record = Utils.load(self.current_journal_file.file_header.file_handle, RecordHeader)
         if not this_record.is_header_valid(self.current_journal_file.file_header):
             return False
@@ -427,32 +460,42 @@ class Journal(object):
             if this_record.file_offset != self.current_journal_file.file_header.first_record_offset:
                 raise qls.err.FirstRecordOffsetMismatchError(self.current_journal_file.file_header, this_record)
             self.first_rec_flag = False
-        high_rid_counter.check(this_record.record_id)
         self.statistics.total_record_count += 1
+        start_journal_file = self.current_journal_file
         if isinstance(this_record, EnqueueRecord):
-            self._handle_enqueue_record(this_record)
-            print this_record
+            ok_flag = self._handle_enqueue_record(this_record, start_journal_file)
+            high_rid_counter.check(this_record.record_id)
+            if self.args.show_recs or self.args.show_all_recs:
+                print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record)
         elif isinstance(this_record, DequeueRecord):
-            self._handle_dequeue_record(this_record)
-            print this_record
+            ok_flag = self._handle_dequeue_record(this_record, start_journal_file)
+            high_rid_counter.check(this_record.record_id)
+            if self.args.show_recs or self.args.show_all_recs:
+                print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record)
         elif isinstance(this_record, TransactionRecord):
-            self._handle_transaction_record(this_record)
-            print this_record
+            ok_flag = self._handle_transaction_record(this_record, start_journal_file)
+            high_rid_counter.check(this_record.record_id)
+            if self.args.show_recs or self.args.show_all_recs:
+                print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record)
         else:
             self.statistics.filler_record_count += 1
+            ok_flag = True
+            if self.args.show_all_recs:
+                print '0x%x:%s' % (start_journal_file.file_header.file_num, this_record)
         Utils.skip(self.current_journal_file.file_header.file_handle, Utils.DBLK_SIZE)
-        return True
-    def _handle_enqueue_record(self, enqueue_record):
-        start_journal_file = self.current_journal_file
+        return ok_flag
+    def _handle_enqueue_record(self, enqueue_record, start_journal_file):
         while enqueue_record.load(self.current_journal_file.file_header.file_handle):
-            self._get_next_file()
-        if not enqueue_record.is_valid(self.current_journal_file):
-            return
+            if not self._get_next_file():
+                enqueue_record.truncated_flag = True
+                return False
+        if not enqueue_record.is_valid(start_journal_file):
+            return False
         if enqueue_record.is_external() and enqueue_record.data != None:
             raise qls.err.ExternalDataError(self.current_journal_file.file_header, enqueue_record)
         if enqueue_record.is_transient():
             self.statistics.transient_record_count += 1
-            return
+            return True
         if enqueue_record.xid_size > 0:
             self.txn_map.add(start_journal_file, enqueue_record)
             self.statistics.transaction_operation_count += 1
@@ -462,13 +505,14 @@ class Journal(object):
             self.enq_map.add(start_journal_file, enqueue_record, False)
         start_journal_file.incr_enq_cnt()
         self.statistics.enqueue_count += 1
-        #print enqueue_record, # DEBUG
-    def _handle_dequeue_record(self, dequeue_record):
-        start_journal_file = self.current_journal_file
+        return True
+    def _handle_dequeue_record(self, dequeue_record, start_journal_file):
         while dequeue_record.load(self.current_journal_file.file_header.file_handle):
-            self._get_next_file()
-        if not dequeue_record.is_valid(self.current_journal_file):
-            return
+            if not self._get_next_file():
+                dequeue_record.truncated_flag = True
+                return False
+        if not dequeue_record.is_valid(start_journal_file):
+            return False
         if dequeue_record.xid_size > 0:
             if self.xid_prepared_list is None: # ie this is the TPL
                 dequeue_record.transaction_prepared_list_flag = True
@@ -484,12 +528,14 @@ class Journal(object):
             except qls.err.RecordIdNotFoundError:
                 dequeue_record.warnings.append('NOT IN EMAP')
         self.statistics.dequeue_count += 1
-        #print dequeue_record, # DEBUG
-    def _handle_transaction_record(self, transaction_record):
+        return True
+    def _handle_transaction_record(self, transaction_record, start_journal_file):
         while transaction_record.load(self.current_journal_file.file_header.file_handle):
-            self._get_next_file()
-        if not transaction_record.is_valid(self.current_journal_file):
-            return
+            if not self._get_next_file():
+                transaction_record.truncated_flag = True
+                return False
+        if not transaction_record.is_valid(start_journal_file):
+            return False
         if transaction_record.magic[-1] == 'a':
             self.statistics.transaction_abort_count += 1
         else:
@@ -501,7 +547,7 @@ class Journal(object):
 #        if transaction_record.magic[-1] == 'c': # commits only
 #            self._txn_obj_list[hdr.xid] = hdr
         self.statistics.transaction_record_count += 1
-        #print transaction_record, # DEBUG
+        return True
     def _load_data(self, record):
         while not record.is_complete:
             record.load(self.current_journal_file.file_handle)
@@ -511,6 +557,7 @@ class JournalFile(object):
         self.file_header = file_header
         self.enq_cnt = 0
         self.deq_cnt = 0
+        self.num_filler_records_required = None
     def incr_enq_cnt(self):
         self.enq_cnt += 1
     def decr_enq_cnt(self, record):
@@ -527,7 +574,8 @@ class JournalFile(object):
         print '-------- ------- ---- ----- ------------'
     def report(self):
         comment = '<uninitialized>' if self.file_header.file_num == 0 else ''
-        print '%8d %7d %4d %4dk %s %s' % (self.file_header.file_num, self.get_enq_cnt(), self.file_header.partition_num,
+        file_num_str = '0x%x' % self.file_header.file_num
+        print '%8s %7d %4d %4dk %s %s' % (file_num_str, self.get_enq_cnt(), self.file_header.partition_num,
                                           self.file_header.efp_data_size_kb,
                                           os.path.basename(self.file_header.file_handle.name), comment)
 
@@ -541,6 +589,9 @@ class RecordHeader(object):
         self.serial = serial
         self.record_id = record_id
         self.warnings = []
+        self.truncated_flag = False
+    def checksum_encode(self):
+        return struct.pack(RecordHeader.FORMAT, self.magic, self.version, self.user_flags, self.serial, self.record_id)
     def load(self, file_handle):
         pass
     @staticmethod
@@ -560,8 +611,6 @@ class RecordHeader(object):
             if self.version != Utils.RECORD_VERSION:
                 raise qls.err.InvalidRecordVersionError(file_header, self, Utils.RECORD_VERSION)
             if self.serial != file_header.serial:
-                #print '[serial mismatch at 0x%x]' % self.file_offset #DEBUG
-                #print #DEBUG
                 return False
         return True
     def _get_warnings(self):
@@ -606,8 +655,8 @@ class RecordTail(object):
                 return False
             self.valid_flag = Utils.inv_str(self.xmagic) == record.magic and \
                               self.serial == record.serial and \
-                              self.record_id == record.record_id
-            # TODO: When we can verify the checksum, add this here
+                              self.record_id == record.record_id and \
+                              Utils.adler32(record.checksum_encode()) == self.checksum
         return self.valid_flag
     def __str__(self):
         """Return a string representation of the this RecordTail instance"""
@@ -660,7 +709,7 @@ class FileHeader(RecordHeader):
         return strftime(fstr, time)
     def __str__(self):
         """Return a string representation of the this FileHeader instance"""
-        return '%s fnum=%d fro=0x%08x p=%d s=%dk t=%s %s' % (RecordHeader.__str__(self), self.file_num,
+        return '%s fnum=0x%x fro=0x%08x p=%d s=%dk t=%s %s' % (RecordHeader.__str__(self), self.file_num,
                                                              self.first_record_offset, self.partition_num,
                                                              self.efp_data_size_kb, self.timestamp_str(),
                                                              self._get_warnings())
@@ -677,6 +726,9 @@ class EnqueueRecord(RecordHeader):
         self.data = None
         self.data_complete = False
         self.record_tail = None
+    def checksum_encode(self):
+        return RecordHeader.checksum_encode(self) + struct.pack(self.FORMAT, self.xid_size, self.data_size) + \
+            self.xid + self.data
     def is_external(self):
         return self.user_flags & EnqueueRecord.EXTERNAL_FLAG_MASK > 0
     def is_transient(self):
@@ -732,6 +784,9 @@ class EnqueueRecord(RecordHeader):
         return fstr
     def __str__(self):
         """Return a string representation of the this EnqueueRecord instance"""
+        if self.truncated_flag:
+            return '%s xid(%d) data(%d) [Truncated, no more files in journal]' % (RecordHeader.__str__(self),
+                                                                                  self.xid_size, self.data_size)
         if self.record_tail is None:
             record_tail_str = ''
         else:
@@ -750,6 +805,9 @@ class DequeueRecord(RecordHeader):
         self.xid = None
         self.xid_complete = False
         self.record_tail = None
+    def checksum_encode(self):
+        return RecordHeader.checksum_encode(self) + struct.pack(self.FORMAT, self.dequeue_record_id, self.xid_size) + \
+            self.xid
     def is_transaction_complete_commit(self):
         return self.user_flags & DequeueRecord.TXN_COMPLETE_COMMIT_FLAG > 0
     def is_valid(self, journal_file):
@@ -790,6 +848,10 @@ class DequeueRecord(RecordHeader):
         return ''
     def __str__(self):
         """Return a string representation of the this DequeueRecord instance"""
+        if self.truncated_flag:
+            return '%s xid(%d) drid=0x%x [Truncated, no more files in journal]' % (RecordHeader.__str__(self),
+                                                                                   self.xid_size,
+                                                                                   self.dequeue_record_id)
         if self.record_tail is None:
             record_tail_str = ''
         else:
@@ -805,6 +867,8 @@ class TransactionRecord(RecordHeader):
         self.xid = None
         self.xid_complete = False
         self.record_tail = None
+    def checksum_encode(self):
+        return RecordHeader.checksum_encode(self) + struct.pack(self.FORMAT, self.xid_size) + self.xid
     def is_valid(self, journal_file):
         if not RecordHeader.is_header_valid(self, journal_file.file_header):
             return False
@@ -832,6 +896,8 @@ class TransactionRecord(RecordHeader):
         return False
     def __str__(self):
         """Return a string representation of the this TransactionRecord instance"""
+        if self.truncated_flag:
+            return '%s xid(%d) [Truncated, no more files in journal]' % (RecordHeader.__str__(self), self.xid_size)
         if self.record_tail is None:
             record_tail_str = ''
         else:
@@ -845,6 +911,9 @@ class Utils(object):
     RECORD_VERSION = 2
     SBLK_SIZE = 4096
     @staticmethod
+    def adler32(data):
+        return zlib.adler32(data) & 0xffffffff
+    @staticmethod
     def create_record(magic, uflags, journal_file, record_id, dequeue_record_id, xid, data):
         record_class = _CLASSES.get(magic[-1])
         record = record_class(0, magic, Utils.RECORD_VERSION, uflags, journal_file.file_header.serial, record_id)
@@ -866,7 +935,7 @@ class Utils(object):
             record.data_complete = True
         record.record_tail = RecordTail(None)
         record.record_tail.xmagic = Utils.inv_str(magic)
-        record.record_tail.checksum = 0 # TODO: when we can calculate checksums, add this here
+        record.record_tail.checksum = Utils.adler32(record.checksum_encode())
         record.record_tail.serial = record.serial
         record.record_tail.record_id = record.record_id
         return record
@@ -878,7 +947,7 @@ class Utils(object):
         # << DEBUG >>
         begin = data.find('msg')
         end = data.find('\0', begin)
-        return 'data="%s"' % data[begin:end]
+        return 'data(%d)="%s"' % (dsize, data[begin:end])
         # << END DEBUG
         if Utils._is_printable(data):
             datastr = Utils._split_str(data)
@@ -941,7 +1010,8 @@ class Utils(object):
     @staticmethod
     def skip(file_handle, boundary):
         """Read and discard disk bytes until the next multiple of boundary"""
-        file_handle.read(Utils._rem_bytes_in_block(file_handle, boundary))
+        if not file_handle.closed:
+            file_handle.read(Utils._rem_bytes_in_block(file_handle, boundary))
     #--- protected functions ---
     @staticmethod
     def _hex_str(in_str, begin, end):

Modified: qpid/trunk/qpid/tools/src/py/qpid_qls_analyze.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qpid_qls_analyze.py?rev=1564808&r1=1564807&r2=1564808&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qpid_qls_analyze.py (original)
+++ qpid/trunk/qpid/tools/src/py/qpid_qls_analyze.py Wed Feb  5 15:34:42 2014
@@ -37,8 +37,8 @@ class QqpdLinearStoreAnalyzer(object):
         self.args = None
         self._process_args()
         self.qls_dir = os.path.abspath(self.args.qls_dir)
-        self.efp_manager = efp.EfpManager(self.qls_dir)
-        self.jrnl_recovery_mgr = jrnl.JournalRecoveryManager(self.qls_dir)
+        self.efp_manager = efp.EfpManager(self.qls_dir, self.args)
+        self.jrnl_recovery_mgr = jrnl.JournalRecoveryManager(self.qls_dir, self.args)
     def _analyze_efp(self):
         self.efp_manager.run(self.args)
     def _analyze_journals(self):
@@ -49,6 +49,10 @@ class QqpdLinearStoreAnalyzer(object):
                             help='Qpid Linear Store (QLS) directory to be analyzed')
         parser.add_argument('--efp', action='store_true',
                             help='Analyze the Emtpy File Pool (EFP) and show stats')
+        parser.add_argument('--show-recs', action='store_true',
+                            help='Show material records found during recovery')
+        parser.add_argument('--show-all-recs', action='store_true',
+                            help='Show all records (including fillers) found during recovery')
         parser.add_argument('--stats', action='store_true',
                             help='Print journal record stats')
         parser.add_argument('--txn', action='store_true',



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org