You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2014/01/29 15:48:55 UTC
svn commit: r1562466 [6/6] - in /qpid/branches/java-broker-bdb-ha: ./ qpid/
qpid/cpp/src/ qpid/cpp/src/qpid/client/ qpid/cpp/src/qpid/framing/
qpid/cpp/src/qpid/ha/ qpid/cpp/src/qpid/messaging/amqp/ qpid/cpp/src/tests/
qpid/java/ qpid/java/amqp-1-0-cli...
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java?rev=1562466&r1=1562465&r2=1562466&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SlowMessageStore.java Wed Jan 29 14:48:50 2014
@@ -26,7 +26,7 @@ import java.util.UUID;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.message.EnqueueableMessage;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.VirtualHost;
@@ -265,7 +265,7 @@ public class SlowMessageStore implements
_underlying = underlying;
}
- public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message)
+ public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message)
throws AMQStoreException
{
doPreDelay("enqueueMessage");
@@ -273,7 +273,7 @@ public class SlowMessageStore implements
doPostDelay("enqueueMessage");
}
- public void dequeueMessage(TransactionLogResource queue, EnqueableMessage message)
+ public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message)
throws AMQStoreException
{
doPreDelay("dequeueMessage");
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/PortRestTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/PortRestTest.java?rev=1562466&r1=1562465&r2=1562466&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/PortRestTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/PortRestTest.java Wed Jan 29 14:48:50 2014
@@ -29,6 +29,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import org.apache.qpid.server.BrokerOptions;
import org.apache.qpid.server.model.AuthenticationProvider;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Protocol;
@@ -288,4 +289,38 @@ public class PortRestTest extends QpidRe
Map<String, Object> port = getRestTestHelper().getJsonAsSingletonList("/rest/port/" + portName);
assertEquals("Unexpected auth provider", ANONYMOUS_AUTHENTICATION_PROVIDER, port.get(Port.AUTHENTICATION_PROVIDER));
}
+
+ public void testDefaultAmqpPortIsQuiescedWhenInManagementMode() throws Exception
+ {
+ // restart Broker in management port
+ stopBroker();
+ startBroker(0, true);
+ getRestTestHelper().setUsernameAndPassword(BrokerOptions.MANAGEMENT_MODE_USER_NAME, MANAGEMENT_MODE_PASSWORD);
+
+ String ampqPortName = TestBrokerConfiguration.ENTRY_NAME_AMQP_PORT;
+ Map<String, Object> portData = getRestTestHelper().getJsonAsSingletonList("/rest/port/" + URLDecoder.decode(ampqPortName, "UTF-8"));
+ Asserts.assertPortAttributes(portData, State.QUIESCED);
+ }
+
+ public void testNewPortQuiescedIfPortNumberWasUsed() throws Exception
+ {
+ String ampqPortName = TestBrokerConfiguration.ENTRY_NAME_AMQP_PORT;
+ Map<String, Object> portData = getRestTestHelper().getJsonAsSingletonList("/rest/port/" + URLDecoder.decode(ampqPortName, "UTF-8"));
+ int amqpPort = (Integer)portData.get(Port.PORT);
+
+ int deleteResponseCode = getRestTestHelper().submitRequest("/rest/port/" + ampqPortName, "DELETE", null);
+ assertEquals("Port deletion should be allowed", 200, deleteResponseCode);
+
+ String newPortName = "reused-port";
+ Map<String, Object> attributes = new HashMap<String, Object>();
+ attributes.put(Port.NAME, newPortName);
+ attributes.put(Port.PORT, amqpPort); // reuses port that was previously in use
+ attributes.put(Port.AUTHENTICATION_PROVIDER, TestBrokerConfiguration.ENTRY_NAME_AUTHENTICATION_PROVIDER);
+
+ int responseCode = getRestTestHelper().submitRequest("/rest/port/" + newPortName, "PUT", attributes);
+ assertEquals("Unexpected response code for port creation", 201, responseCode);
+
+ portData = getRestTestHelper().getJsonAsSingletonList("/rest/port/" + URLDecoder.decode(newPortName, "UTF-8"));
+ Asserts.assertPortAttributes(portData, State.QUIESCED);
+ }
}
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java?rev=1562466&r1=1562465&r2=1562466&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java Wed Jan 29 14:48:50 2014
@@ -281,8 +281,7 @@ public class BrokerACLTest extends QpidR
assertPortExists(portName);
}
- // TODO: test disabled until allowing the deletion of active ports outside management mode
- public void DISABLED_testDeletePortAllowed() throws Exception
+ public void testDeletePortAllowed() throws Exception
{
getRestTestHelper().setUsernameAndPassword(ALLOWED_USER, ALLOWED_USER);
Modified: qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java?rev=1562466&r1=1562465&r2=1562466&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/java/systests/src/main/java/org/apache/qpid/test/client/timeouts/SyncWaitDelayTest.java Wed Jan 29 14:48:50 2014
@@ -57,7 +57,7 @@ public class SyncWaitDelayTest extends Q
final String prefix = "virtualhosts.virtualhost." + VIRTUALHOST;
setVirtualHostConfigurationProperty(prefix + ".type", StandardVirtualHostFactory.TYPE);
- setVirtualHostConfigurationProperty(prefix + ".store.class", "org.apache.qpid.server.store.SlowMessageStore");
+ setVirtualHostConfigurationProperty(prefix + ".store.class", org.apache.qpid.server.store.SlowMessageStore.class.getName());
setVirtualHostConfigurationProperty(prefix + ".store.delays.commitTran.post", String.valueOf(POST_COMMIT_DELAY));
super.setUp();
Propchange: qpid/branches/java-broker-bdb-ha/qpid/python/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/python:r1560620-1562452
Modified: qpid/branches/java-broker-bdb-ha/qpid/python/qpid-python-test
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/python/qpid-python-test?rev=1562466&r1=1562465&r2=1562466&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/python/qpid-python-test (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/python/qpid-python-test Wed Jan 29 14:48:50 2014
@@ -633,7 +633,7 @@ if not list_only:
if xmlr:
xmlr.end()
-if failed or skipped:
+if failed:
sys.exit(1)
else:
sys.exit(0)
Modified: qpid/branches/java-broker-bdb-ha/qpid/python/qpid/compat.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/python/qpid/compat.py?rev=1562466&r1=1562465&r2=1562466&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/python/qpid/compat.py (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/python/qpid/compat.py Wed Jan 29 14:48:50 2014
@@ -54,7 +54,8 @@ class BaseWaiter:
def wait(self, timeout=None):
start = time.time()
if timeout is not None:
- while True:
+ ready = False
+ while timeout > 0:
try:
ready, _, _ = select([self], [], [], timeout)
break
Modified: qpid/branches/java-broker-bdb-ha/qpid/tools/src/py/qls/efp.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/tools/src/py/qls/efp.py?rev=1562466&r1=1562465&r2=1562466&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/tools/src/py/qls/efp.py (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/tools/src/py/qls/efp.py Wed Jan 29 14:48:50 2014
@@ -40,7 +40,7 @@ class EfpManager(object):
print
for ptn in self.partitions:
ptn.report()
- def run(self, args):
+ def run(self, _):
for dir_entry in os.listdir(self.directory):
try:
efpp = EfpPartition(os.path.join(self.directory, dir_entry))
Modified: qpid/branches/java-broker-bdb-ha/qpid/tools/src/py/qls/err.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/tools/src/py/qls/err.py?rev=1562466&r1=1562465&r2=1562466&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/tools/src/py/qls/err.py (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/tools/src/py/qls/err.py Wed Jan 29 14:48:50 2014
@@ -43,7 +43,7 @@ class AlreadyLockedError(QlsRecordError)
def __str__(self):
return 'Transactional operation already locked in TransactionMap: ' + QlsRecordError.__str__(self)
-class DatqaSizeError(QlsError):
+class DataSizeError(QlsError):
"""Error class for Data size mismatch"""
def __init__(self, expected_size, actual_size, data_str):
QlsError.__init__(self)
@@ -61,6 +61,13 @@ class DuplicateRecordIdError(QlsRecordEr
def __str__(self):
return 'Duplicate Record Id in enqueue map: ' + QlsRecordError.__str__(self)
+class EnqueueCountUnderflowError(QlsRecordError):
+ """Attempted to decrement enqueue count past 0"""
+ def __init__(self, file_header, record):
+ QlsRecordError.__init__(self, file_header, record)
+ def __str__(self):
+ return 'Enqueue record count underflow: ' + QlsRecordError.__str__(self)
+
class ExternalDataError(QlsRecordError):
"""Data present in Enqueue record when external data flag is set"""
def __init__(self, file_header, record):
@@ -75,6 +82,13 @@ class FirstRecordOffsetMismatchError(Qls
def __str__(self):
return 'First record offset mismatch: ' + QlsRecordError.__str__(self) + ' expected_offset=0x%x' % \
self.file_header.first_record_offset
+
+class InvalidClassError(QlsError):
+ """Invalid class name or type"""
+ def __init__(self, class_name):
+ self.class_name = class_name
+ def __str__(self):
+ return 'Invalid class name "%s"' % self.class_name
class InvalidEfpDirectoryNameError(QlsError):
"""Invalid EFP directory name - should be NNNNk, where NNNN is a number (of any length)"""
Modified: qpid/branches/java-broker-bdb-ha/qpid/tools/src/py/qls/jrnl.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/tools/src/py/qls/jrnl.py?rev=1562466&r1=1562465&r2=1562466&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/tools/src/py/qls/jrnl.py (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/tools/src/py/qls/jrnl.py Wed Jan 29 14:48:50 2014
@@ -20,6 +20,7 @@
import os
import os.path
import qls.err
+import string
import struct
from time import gmtime, strftime
@@ -46,25 +47,55 @@ class JournalRecoveryManager(object):
self.journals = {}
self.high_rid_counter = HighCounter()
def report(self, print_stats_flag):
- if not self.tpl is None:
+ if self.tpl is not None:
self.tpl.report(print_stats_flag)
for queue_name in sorted(self.journals.keys()):
self.journals[queue_name].report(print_stats_flag)
def run(self, args):
tpl_dir = os.path.join(self.directory, JournalRecoveryManager.TPL_DIR_NAME)
if os.path.exists(tpl_dir):
- self.tpl = Journal(os.path.join(self.directory, tpl_dir), None)
+ self.tpl = Journal(tpl_dir, None)
self.tpl.recover(self.high_rid_counter)
print
jrnl_dir = os.path.join(self.directory, JournalRecoveryManager.JRNL_DIR_NAME)
+ prepared_list = self.tpl.txn_map.get_prepared_list()
if os.path.exists(jrnl_dir):
- for dir_entry in os.listdir(jrnl_dir):
- jrnl = Journal(os.path.join(jrnl_dir, dir_entry), self.tpl.get_outstanding_transaction_list())
+ for dir_entry in sorted(os.listdir(jrnl_dir)):
+ jrnl = Journal(os.path.join(jrnl_dir, dir_entry), prepared_list)
jrnl.recover(self.high_rid_counter)
self.journals[jrnl.get_queue_name()] = jrnl
- if args.txn:
- jrnl.reconcile_transactions(self.high_rid_counter)
print
+ self._reconcile_transactions(prepared_list, args.txn)
+ def _reconcile_transactions(self, prepared_list, txn_flag):
+ print 'Transaction reconciliation report:'
+ print len(prepared_list), 'open transaction(s) found in prepared transaction list:'
+ for xid in prepared_list.keys():
+ commit_flag = prepared_list[xid]
+ if commit_flag is None:
+ status = '[Prepared, neither committed nor aborted - assuming commit]'
+ elif commit_flag:
+ status = '[Prepared, but interrupted during commit phase]'
+ else:
+ status = '[Prepared, but interrupted during abort phase]'
+ print ' ', Utils.format_xid(xid), status
+ if prepared_list[xid] is None: # Prepared, but not committed or aborted
+ enqueue_record = self.tpl.get_txn_map_record(xid)
+ dequeue_record = Utils.create_record('QLSd', DequeueRecord.TXN_COMPLETE_COMMIT_FLAG, \
+ self.tpl.current_journal_file, self.high_rid_counter.get_next(), \
+ enqueue_record.record_id, xid, None)
+ if txn_flag:
+ self.tpl.add_record(dequeue_record)
+ for queue_name in sorted(self.journals.keys()):
+ self.journals[queue_name].reconcile_transactions(prepared_list, txn_flag)
+ if len(prepared_list) > 0:
+ print 'Completing prepared transactions in prepared transaction list:'
+ for xid in prepared_list.keys():
+ print ' ', Utils.format_xid(xid)
+ transaction_record = Utils.create_record('QLSc', 0, self.tpl.current_journal_file, \
+ self.high_rid_counter.get_next(), None, xid, None)
+ if txn_flag:
+ self.tpl.add_record(transaction_record)
+ print
class EnqueueMap(object):
"""
@@ -73,23 +104,29 @@ class EnqueueMap(object):
def __init__(self, journal):
self.journal = journal
self.enq_map = {}
- def add(self, file_header, enq_record, locked_flag):
+ def add(self, journal_file, enq_record, locked_flag):
if enq_record.record_id in self.enq_map:
raise qls.err.DuplicateRecordIdError(self.journal.current_file_header, enq_record)
- self.enq_map[enq_record.record_id] = [file_header.file_num, enq_record, locked_flag]
+ self.enq_map[enq_record.record_id] = [journal_file, enq_record, locked_flag]
def contains(self, rid):
"""Return True if the map contains the given rid"""
return rid in self.enq_map
- def delete(self, file_header, deq_record):
+ def delete(self, journal_file, deq_record):
if deq_record.dequeue_record_id in self.enq_map:
+ enq_list = self.enq_map[deq_record.dequeue_record_id]
del self.enq_map[deq_record.dequeue_record_id]
+ return enq_list
else:
- raise qls.err.RecordIdNotFoundError(file_header, deq_record)
- def lock(self, file_header, dequeue_record):
- if not dequeue_record.dequeue_record_id in self.enq_map:
- raise qls.err.RecordIdNotFoundError(file_header, dequeue_record)
+ raise qls.err.RecordIdNotFoundError(journal_file.file_header, deq_record)
+ def get(self, record_id):
+ if record_id in self.enq_map:
+ return self.enq_map[record_id]
+ return None
+ def lock(self, journal_file, dequeue_record):
+ if dequeue_record.dequeue_record_id not in self.enq_map:
+ raise qls.err.RecordIdNotFoundError(journal_file.file_header, dequeue_record)
self.enq_map[dequeue_record.dequeue_record_id][2] = True
- def report_str(self, show_stats, show_records):
+ def report_str(self, _, show_records):
"""Return a string containing a text report for all records in the map"""
if len(self.enq_map) == 0:
return 'No enqueued records found.'
@@ -99,24 +136,24 @@ class EnqueueMap(object):
rid_list = self.enq_map.keys()
rid_list.sort()
for rid in rid_list:
- file_num, record, locked_flag = self.enq_map[rid]
+ journal_file, record, locked_flag = self.enq_map[rid]
if locked_flag:
lock_str = '[LOCKED]'
else:
lock_str = ''
- rstr += '\n %d:%s %s' % (file_num, record, lock_str)
+ rstr += '\n %d:%s %s' % (journal_file.file_header.file_num, record, lock_str)
else:
rstr += '.'
return rstr
- def unlock(self, file_header, dequeue_record):
+ def unlock(self, journal_file, dequeue_record):
"""Set the transaction lock for a given record_id to False"""
if dequeue_record.dequeue_record_id in self.enq_map:
if self.enq_map[dequeue_record.dequeue_record_id][2]:
self.enq_map[dequeue_record.dequeue_record_id][2] = False
else:
- raise qls.err.RecordNotLockedError(file_header, dequeue_record)
+ raise qls.err.RecordNotLockedError(journal_file.file_header, dequeue_record)
else:
- raise qls.err.RecordIdNotFoundError(file_header, dequeue_record)
+ raise qls.err.RecordIdNotFoundError(journal_file.file_header, dequeue_record)
class TransactionMap(object):
"""
@@ -127,38 +164,40 @@ class TransactionMap(object):
self.enq_map = enq_map
def abort(self, xid):
"""Perform an abort operation for the given xid record"""
- for file_header, record, lock_flag in self.txn_map[xid]:
+ for journal_file, record, _ in self.txn_map[xid]:
if isinstance(record, DequeueRecord):
if self.enq_map.contains(record.dequeue_record_id):
- self.enq_map.unlock(file_header, record)
+ self.enq_map.unlock(journal_file, record)
+ else:
+ journal_file.decr_enq_cnt(record)
del self.txn_map[xid]
- def add(self, file_header, record):
+ def add(self, journal_file, record):
if record.xid is None:
- raise qls.err.NonTransactionalRecordError(file_header, record, 'TransactionMap.add()')
+ raise qls.err.NonTransactionalRecordError(journal_file.file_header, record, 'TransactionMap.add()')
if isinstance(record, DequeueRecord):
try:
- self.enq_map.lock(file_header, record)
+ self.enq_map.lock(journal_file, record)
except qls.err.RecordIdNotFoundError:
# Not in emap, look for rid in tmap - should not happen in practice
txn_op = self._find_record_id(record.xid, record.dequeue_record_id)
if txn_op != None:
if txn_op[2]:
- raise qls.err.AlreadyLockedError(file_header, record)
+ raise qls.err.AlreadyLockedError(journal_file.file_header, record)
txn_op[2] = True
if record.xid in self.txn_map:
- self.txn_map[record.xid].append([file_header, record, False]) # append to existing list
+ self.txn_map[record.xid].append([journal_file, record, False]) # append to existing list
else:
- self.txn_map[record.xid] = [[file_header, record, False]] # create new list
+ self.txn_map[record.xid] = [[journal_file, record, False]] # create new list
def commit(self, xid):
"""Perform a commit operation for the given xid record"""
mismatch_list = []
- for file_header, record, lock in self.txn_map[xid]:
+ for journal_file, record, lock in self.txn_map[xid]:
if isinstance(record, EnqueueRecord):
- self.enq_map.add(file_header, record, lock) # Transfer enq to emap
+ self.enq_map.add(journal_file, record, lock) # Transfer enq to emap
else:
if self.enq_map.contains(record.dequeue_record_id):
- self.enq_map.unlock(file_header, record)
- self.enq_map.delete(file_header, record)
+ self.enq_map.unlock(journal_file, record)
+ self.enq_map.delete(journal_file, record)[0].decr_enq_cnt(record)
else:
mismatch_list.append('0x%x' % record.dequeue_record_id)
del self.txn_map[xid]
@@ -166,27 +205,47 @@ class TransactionMap(object):
def contains(self, xid):
"""Return True if the xid exists in the map; False otherwise"""
return xid in self.txn_map
- def delete(self, file_header, transaction_record):
+ def delete(self, journal_file, transaction_record):
"""Remove a transaction record from the map using either a commit or abort header"""
if transaction_record.magic[-1] == 'c':
return self.commit(transaction_record.xid)
if transaction_record.magic[-1] == 'a':
self.abort(transaction_record.xid)
else:
- raise qls.err.InvalidRecordTypeError(file_header, transaction_record, 'delete from Transaction Map')
+ raise qls.err.InvalidRecordTypeError(journal_file.file_header, transaction_record,
+ 'delete from Transaction Map')
+ def get(self, xid):
+ if xid in self.txn_map:
+ return self.txn_map[xid]
+ return None
+ def get_prepared_list(self):
+ """
+ Prepared list is a map of xid(key) to one of None, True or False. These represent respectively:
+ None: prepared, but neither committed or aborted (interrupted before commit or abort)
+ False: prepared and aborted (interrupted before abort complete)
+ True: prepared and committed (interrupted before commit complete)
+ """
+ prepared_list = {}
+ for xid in self.get_xid_list():
+ for _, record, _ in self.txn_map[xid]:
+ if isinstance(record, EnqueueRecord):
+ prepared_list[xid] = None
+ else:
+ prepared_list[xid] = record.is_transaction_complete_commit()
+ return prepared_list
def get_xid_list(self):
return self.txn_map.keys()
- def report_str(self, show_stats, show_records):
+ def report_str(self, _, show_records):
"""Return a string containing a text report for all records in the map"""
if len(self.txn_map) == 0:
return 'No outstanding transactions found.'
rstr = '%d outstanding transaction(s)' % len(self.txn_map)
if show_records:
rstr += ':'
- for xid, list in self.txn_map.iteritems():
- rstr += '\n %s containing %d operations:' % (Utils.format_xid(xid), len(list))
- for file_header, record, locked_flag in list:
- rstr += '\n %d:%s' % (file_header.file_num, record)
+ for xid, op_list in self.txn_map.iteritems():
+ rstr += '\n %s containing %d operations:' % (Utils.format_xid(xid), len(op_list))
+ for journal_file, record, _ in op_list:
+ rstr += '\n %d:%s' % (journal_file.file_header.file_num, record)
else:
rstr += '.'
return rstr
@@ -202,48 +261,48 @@ class TransactionMap(object):
return txn_op
return None
+class JournalStatistics(object):
+ """Journal statistics"""
+ def __init__(self):
+ self.total_record_count = 0
+ self.transient_record_count = 0
+ self.filler_record_count = 0
+ self.enqueue_count = 0
+ self.dequeue_count = 0
+ self.transaction_record_count = 0
+ self.transaction_enqueue_count = 0
+ self.transaction_dequeue_count = 0
+ self.transaction_commit_count = 0
+ self.transaction_abort_count = 0
+ self.transaction_operation_count = 0
+ def __str__(self):
+ fstr = 'Total record count: %d\n' + \
+ 'Transient record count: %d\n' + \
+ 'Filler_record_count: %d\n' + \
+ 'Enqueue_count: %d\n' + \
+ 'Dequeue_count: %d\n' + \
+ 'Transaction_record_count: %d\n' + \
+ 'Transaction_enqueue_count: %d\n' + \
+ 'Transaction_dequeue_count: %d\n' + \
+ 'Transaction_commit_count: %d\n' + \
+ 'Transaction_abort_count: %d\n' + \
+ 'Transaction_operation_count: %d\n'
+ return fstr % (self.total_record_count,
+ self.transient_record_count,
+ self.filler_record_count,
+ self.enqueue_count,
+ self.dequeue_count,
+ self.transaction_record_count,
+ self.transaction_enqueue_count,
+ self.transaction_dequeue_count,
+ self.transaction_commit_count,
+ self.transaction_abort_count,
+ self.transaction_operation_count)
+
class Journal(object):
"""
Instance of a Qpid Linear Store (QLS) journal.
"""
- class JournalStatistics(object):
- """Journal statistics"""
- def __init__(self):
- self.total_record_count = 0
- self.transient_record_count = 0
- self.filler_record_count = 0
- self.enqueue_count = 0
- self.dequeue_count = 0
- self.transaction_record_count = 0
- self.transaction_enqueue_count = 0
- self.transaction_dequeue_count = 0
- self.transaction_commit_count = 0
- self.transaction_abort_count = 0
- self.transaction_operation_count = 0
- def __str__(self):
- fstr = 'Total record count: %d\n' + \
- 'Transient record count: %d\n' + \
- 'Filler_record_count: %d\n' + \
- 'Enqueue_count: %d\n' + \
- 'Dequeue_count: %d\n' + \
- 'Transaction_record_count: %d\n' + \
- 'Transaction_enqueue_count: %d\n' + \
- 'Transaction_dequeue_count: %d\n' + \
- 'Transaction_commit_count: %d\n' + \
- 'Transaction_abort_count: %d\n' + \
- 'Transaction_operation_count: %d\n'
- return fstr % (self.total_record_count,
- self.transient_record_count,
- self.filler_record_count,
- self.enqueue_count,
- self.dequeue_count,
- self.transaction_record_count,
- self.transaction_enqueue_count,
- self.transaction_dequeue_count,
- self.transaction_commit_count,
- self.transaction_abort_count,
- self.transaction_operation_count)
-
def __init__(self, directory, xid_prepared_list):
self.directory = directory
self.queue_name = os.path.basename(directory)
@@ -252,12 +311,25 @@ class Journal(object):
self.file_num_itr = None
self.enq_map = EnqueueMap(self)
self.txn_map = TransactionMap(self.enq_map)
- self.current_file_header = None
+ self.current_journal_file = None
self.first_rec_flag = None
- self.statistics = Journal.JournalStatistics()
- self.warnings = []
+ self.statistics = JournalStatistics()
self.xid_prepared_list = xid_prepared_list # This is None for the TPL instance only
- def get_outstanding_transaction_list(self):
+ def add_record(self, record):
+ if isinstance(record, EnqueueRecord) or isinstance(record, DequeueRecord):
+ if record.xid_size > 0:
+ self.txn_map.add(self.current_journal_file, record)
+ else:
+ self.enq_map.add(self.current_journal_file, record, False)
+ elif isinstance(record, TransactionRecord):
+ self.txn_map.delete(self.current_journal_file, record)
+ else:
+ raise qls.err.InvalidRecordTypeError(self.current_journal_file, record, 'add to Journal')
+ def get_enq_map_record(self, rid):
+ return self.enq_map.get(rid)
+ def get_txn_map_record(self, xid):
+ return self.txn_map.get(xid)
+ def get_outstanding_txn_list(self):
return self.txn_map.get_xid_list()
def get_queue_name(self):
return self.queue_name
@@ -271,15 +343,28 @@ class Journal(object):
#print '[No more files in journal]' # DEBUG
#print #DEBUG
pass
- def reconcile_transactions(self, high_rid_counter):
- if not self.xid_prepared_list is None: # ie This is not the TPL instance
- print 'Reconcile outstanding prepared transactions:'
- for xid in self.txn_map.get_xid_list():
- if xid in self.xid_prepared_list:
- print ' Committing', Utils.format_xid(xid)
- self.txn_map.commit(xid)
+ def reconcile_transactions(self, prepared_list, txn_flag):
+ xid_list = self.txn_map.get_xid_list()
+ if len(xid_list) > 0:
+ print self.queue_name, 'contains', len(xid_list), 'open transaction(s):'
+ for xid in xid_list:
+ if xid in prepared_list.keys():
+ commit_flag = prepared_list[xid]
+ if commit_flag is None:
+ print ' ', Utils.format_xid(xid), '- Assuming commit after prepare'
+ if txn_flag:
+ self.txn_map.commit(xid)
+ elif commit_flag:
+ print ' ', Utils.format_xid(xid), '- Completing interrupted commit operation'
+ if txn_flag:
+ self.txn_map.commit(xid)
else:
- print ' Aborting', Utils.format_xid(xid)
+ print ' ', Utils.format_xid(xid), '- Completing interrupted abort operation'
+ if txn_flag:
+ self.txn_map.abort(xid)
+ else:
+ print ' ', Utils.format_xid(xid), '- Aborting, not in prepared transaction list'
+ if txn_flag:
self.txn_map.abort(xid)
def report(self, print_stats_flag):
print 'Journal "%s":' % self.queue_name
@@ -287,13 +372,9 @@ class Journal(object):
print str(self.statistics)
print self.enq_map.report_str(True, True)
print self.txn_map.report_str(True, True)
- print 'file_num p_no efp journal_file'
- print '-------- ---- ----- ------------'
+ JournalFile.report_header()
for file_num in sorted(self.files.keys()):
- file_hdr = self.files[file_num]
- comment = '<uninitialized>' if file_hdr.file_num == 0 else ''
- print '%8d %4d %4dk %s %s' % (file_num, file_hdr.partition_num, file_hdr.efp_data_size_kb,
- os.path.basename(file_hdr.file_handle.name), comment)
+ self.files[file_num].report()
print
#--- protected functions ---
def _analyze_files(self):
@@ -305,22 +386,26 @@ class Journal(object):
args = Utils.load_args(file_handle, RecordHeader)
file_hdr = FileHeader(*args)
file_hdr.init(file_handle, *Utils.load_args(file_handle, FileHeader))
- if not file_hdr.is_valid(file_hdr):
+ if not file_hdr.is_header_valid(file_hdr):
break
file_hdr.load(file_handle)
+ if not file_hdr.is_valid():
+ break
Utils.skip(file_handle, file_hdr.file_header_size_sblks * Utils.SBLK_SIZE)
- self.files[file_hdr.file_num] = file_hdr
+ self.files[file_hdr.file_num] = JournalFile(file_hdr)
self.file_num_list = sorted(self.files.keys())
self.file_num_itr = iter(self.file_num_list)
def _check_file(self):
- if not self.current_file_header is None and not self.current_file_header.is_end_of_file():
+ if self.current_journal_file is not None and not self.current_journal_file.file_header.is_end_of_file():
return
self._get_next_file()
- self.current_file_header.file_handle.seek(self.current_file_header.first_record_offset)
+ fhdr = self.current_journal_file.file_header
+ fhdr.file_handle.seek(fhdr.first_record_offset)
def _get_next_file(self):
- if not self.current_file_header is None:
- if not self.current_file_header.file_handle.closed: # sanity check, should not be necessary
- self.current_file_header.file_handle.close()
+ if self.current_journal_file is not None:
+ file_handle = self.current_journal_file.file_header.file_handle
+ if not file_handle.closed: # sanity check, should not be necessary
+ file_handle.close()
file_num = 0
try:
while file_num == 0:
@@ -329,18 +414,18 @@ class Journal(object):
pass
if file_num == 0:
raise qls.err.NoMoreFilesInJournalError(self.queue_name)
- self.current_file_header = self.files[file_num]
+ self.current_journal_file = self.files[file_num]
self.first_rec_flag = True
- print self.current_file_header
- #print '[file_num=0x%x]' % self.current_file_header.file_num #DEBUG
+ print self.current_journal_file.file_header
+ #print '[file_num=0x%x]' % self.current_journal_file.file_num #DEBUG
def _get_next_record(self, high_rid_counter):
self._check_file()
- this_record = Utils.load(self.current_file_header.file_handle, RecordHeader)
- if not this_record.is_valid(self.current_file_header):
+ this_record = Utils.load(self.current_journal_file.file_header.file_handle, RecordHeader)
+ if not this_record.is_header_valid(self.current_journal_file.file_header):
return False
if self.first_rec_flag:
- if this_record.file_offset != self.current_file_header.first_record_offset:
- raise qls.err.FirstRecordOffsetMismatchError(self.current_file_header, this_record)
+ if this_record.file_offset != self.current_journal_file.file_header.first_record_offset:
+ raise qls.err.FirstRecordOffsetMismatchError(self.current_journal_file.file_header, this_record)
self.first_rec_flag = False
high_rid_counter.check(this_record.record_id)
self.statistics.total_record_count += 1
@@ -355,65 +440,96 @@ class Journal(object):
print this_record
else:
self.statistics.filler_record_count += 1
- Utils.skip(self.current_file_header.file_handle, Utils.DBLK_SIZE)
+ Utils.skip(self.current_journal_file.file_header.file_handle, Utils.DBLK_SIZE)
return True
def _handle_enqueue_record(self, enqueue_record):
- start_file_header = self.current_file_header
- while enqueue_record.load(self.current_file_header.file_handle):
+ start_journal_file = self.current_journal_file
+ while enqueue_record.load(self.current_journal_file.file_header.file_handle):
self._get_next_file()
+ if not enqueue_record.is_valid(self.current_journal_file):
+ return
if enqueue_record.is_external() and enqueue_record.data != None:
- raise qls.err.ExternalDataError(self.current_file_header, enqueue_record)
+ raise qls.err.ExternalDataError(self.current_journal_file.file_header, enqueue_record)
if enqueue_record.is_transient():
self.statistics.transient_record_count += 1
return
if enqueue_record.xid_size > 0:
- self.txn_map.add(start_file_header, enqueue_record)
+ self.txn_map.add(start_journal_file, enqueue_record)
self.statistics.transaction_operation_count += 1
self.statistics.transaction_record_count += 1
self.statistics.transaction_enqueue_count += 1
else:
- self.enq_map.add(start_file_header, enqueue_record, False)
+ self.enq_map.add(start_journal_file, enqueue_record, False)
+ start_journal_file.incr_enq_cnt()
self.statistics.enqueue_count += 1
#print enqueue_record, # DEBUG
def _handle_dequeue_record(self, dequeue_record):
- while dequeue_record.load(self.current_file_header.file_handle):
+ start_journal_file = self.current_journal_file
+ while dequeue_record.load(self.current_journal_file.file_header.file_handle):
self._get_next_file()
+ if not dequeue_record.is_valid(self.current_journal_file):
+ return
if dequeue_record.xid_size > 0:
if self.xid_prepared_list is None: # ie this is the TPL
dequeue_record.transaction_prepared_list_flag = True
- self.txn_map.add(self.current_file_header, dequeue_record)
+ elif not self.enq_map.contains(dequeue_record.dequeue_record_id):
+ dequeue_record.warnings.append('NOT IN EMAP') # Only for non-TPL records
+ self.txn_map.add(start_journal_file, dequeue_record)
self.statistics.transaction_operation_count += 1
self.statistics.transaction_record_count += 1
self.statistics.transaction_dequeue_count += 1
else:
try:
- self.enq_map.delete(self.current_file_header, dequeue_record)
+ self.enq_map.delete(start_journal_file, dequeue_record)[0].decr_enq_cnt(dequeue_record)
except qls.err.RecordIdNotFoundError:
- pass # TODO: handle missing enqueue warning here
+ dequeue_record.warnings.append('NOT IN EMAP')
self.statistics.dequeue_count += 1
#print dequeue_record, # DEBUG
def _handle_transaction_record(self, transaction_record):
- while transaction_record.load(self.current_file_header.file_handle):
+ while transaction_record.load(self.current_journal_file.file_header.file_handle):
self._get_next_file()
+ if not transaction_record.is_valid(self.current_journal_file):
+ return
if transaction_record.magic[-1] == 'a':
self.statistics.transaction_abort_count += 1
else:
self.statistics.transaction_commit_count += 1
if self.txn_map.contains(transaction_record.xid):
- mismatched_rids = self.txn_map.delete(self.current_file_header, transaction_record)
- if mismatched_rids != None and len(mismatched_rids) > 0:
- self.warnings.append('WARNING: transactional dequeues not found in enqueue map; rids=%s' %
- mismatched_rids)
+ self.txn_map.delete(self.current_journal_file, transaction_record)
else:
- self.warnings.append('WARNING: %s not found in transaction map' % \
- Utils.format_xid(transaction_record.xid))
+ transaction_record.warnings.append('NOT IN TMAP')
# if transaction_record.magic[-1] == 'c': # commits only
# self._txn_obj_list[hdr.xid] = hdr
self.statistics.transaction_record_count += 1
#print transaction_record, # DEBUG
def _load_data(self, record):
while not record.is_complete:
- record.load(self.current_file_header.file_handle)
+ record.load(self.current_journal_file.file_handle)
+
+class JournalFile(object):
+ def __init__(self, file_header):
+ self.file_header = file_header
+ self.enq_cnt = 0
+ self.deq_cnt = 0
+ def incr_enq_cnt(self):
+ self.enq_cnt += 1
+ def decr_enq_cnt(self, record):
+ if self.enq_cnt <= self.deq_cnt:
+ raise qls.err.EnqueueCountUnderflowError(self.file_header, record)
+ self.deq_cnt += 1
+ def get_enq_cnt(self):
+ return self.enq_cnt - self.deq_cnt
+ def is_outstanding_enq(self):
+ return self.enq_cnt > self.deq_cnt
+ @staticmethod
+ def report_header():
+ print 'file_num enq_cnt p_no efp journal_file'
+ print '-------- ------- ---- ----- ------------'
+ def report(self):
+ comment = '<uninitialized>' if self.file_header.file_num == 0 else ''
+ print '%8d %7d %4d %4dk %s %s' % (self.file_header.file_num, self.get_enq_cnt(), self.file_header.partition_num,
+ self.file_header.efp_data_size_kb,
+ os.path.basename(self.file_header.file_handle.name), comment)
class RecordHeader(object):
FORMAT = '<4s2H2Q'
@@ -424,6 +540,7 @@ class RecordHeader(object):
self.user_flags = user_flags
self.serial = serial
self.record_id = record_id
+ self.warnings = []
def load(self, file_handle):
pass
@staticmethod
@@ -433,7 +550,7 @@ class RecordHeader(object):
def is_empty(self):
"""Return True if this record is empty (ie has a magic of 0x0000"""
return self.magic == '\x00'*4
- def is_valid(self, file_header):
+ def is_header_valid(self, file_header):
"""Check that this record is valid"""
if self.is_empty():
return False
@@ -447,6 +564,11 @@ class RecordHeader(object):
#print #DEBUG
return False
return True
+ def _get_warnings(self):
+ warn_str = ''
+ for warn in self.warnings:
+ warn_str += '<%s>' % warn
+ return warn_str
def __str__(self):
"""Return string representation of this header"""
if self.is_empty():
@@ -460,12 +582,13 @@ class RecordHeader(object):
class RecordTail(object):
FORMAT = '<4sL2Q'
- def __init__(self, file_handle):
- self.file_offset = file_handle.tell()
+ def __init__(self, file_handle): # TODO - clumsy, only allows reading from disk. Move all disk stuff to laod()
+ self.file_offset = file_handle.tell() if file_handle is not None else 0
self.complete = False
self.read_size = struct.calcsize(RecordTail.FORMAT)
- self.fbin = file_handle.read(self.read_size)
- if len(self.fbin) >= self.read_size:
+ self.fbin = file_handle.read(self.read_size) if file_handle is not None else None
+ self.valid_flag = None
+ if self.fbin is not None and len(self.fbin) >= self.read_size:
self.complete = True
self.xmagic, self.checksum, self.serial, self.record_id = struct.unpack(RecordTail.FORMAT, self.fbin)
def load(self, file_handle):
@@ -477,14 +600,27 @@ class RecordTail(object):
self.xmagic, self.checksum, self.serial, self.record_id = struct.unpack(RecordTail.FORMAT, self.fbin)
def is_complete(self):
return self.complete
+ def is_valid(self, record):
+ if self.valid_flag is None:
+ if not self.complete:
+ return False
+ self.valid_flag = Utils.inv_str(self.xmagic) == record.magic and \
+ self.serial == record.serial and \
+ self.record_id == record.record_id
+ # TODO: When we can verify the checksum, add this here
+ return self.valid_flag
def __str__(self):
"""Return a string representation of the this RecordTail instance"""
+ if self.valid_flag is not None:
+ if not self.valid_flag:
+ return '[INVALID RECORD TAIL]'
magic = Utils.inv_str(self.xmagic)
- return '[%c cs=0x%x rid=0x%x]' % (magic[-1].upper(), self.checksum, self.record_id)
+ magic_char = magic[-1].upper() if magic[-1] in string.printable else '?'
+ return '[%c cs=0x%08x rid=0x%x]' % (magic_char, self.checksum, self.record_id)
class FileHeader(RecordHeader):
FORMAT = '<2H4x5QH'
- def init(self, file_handle, file_offset, file_header_size_sblks, partition_num, efp_data_size_kb,
+ def init(self, file_handle, _, file_header_size_sblks, partition_num, efp_data_size_kb,
first_record_offset, timestamp_sec, timestamp_ns, file_num, queue_name_len):
self.file_handle = file_handle
self.file_header_size_sblks = file_header_size_sblks
@@ -503,6 +639,20 @@ class FileHeader(RecordHeader):
return (self.file_header_size_sblks * Utils.SBLK_SIZE) + (self.efp_data_size_kb * 1024)
def is_end_of_file(self):
return self.file_handle.tell() >= self.get_file_size()
+ def is_valid(self):
+ if not RecordHeader.is_header_valid(self, self):
+ return False
+ if self.file_handle is None or self.file_header_size_sblks == 0 or self.partition_num == 0 or \
+ self.efp_data_size_kb == 0 or self.first_record_offset == 0 or self.timestamp_sec == 0 or \
+ self.timestamp_ns == 0 or self.file_num == 0:
+ return False
+ if self.queue_name_len == 0:
+ return False
+ if self.queue_name is None:
+ return False
+ if len(self.queue_name) != self.queue_name_len:
+ return False
+ return True
def timestamp_str(self):
"""Get the timestamp of this record in string format"""
time = gmtime(self.timestamp_sec)
@@ -510,14 +660,16 @@ class FileHeader(RecordHeader):
return strftime(fstr, time)
def __str__(self):
"""Return a string representation of the this FileHeader instance"""
- return '%s fnum=%d fro=0x%08x p=%d s=%dk t=%s' % (RecordHeader.__str__(self), self.file_num, self.first_record_offset,
- self.partition_num, self.efp_data_size_kb, self.timestamp_str())
+ return '%s fnum=%d fro=0x%08x p=%d s=%dk t=%s %s' % (RecordHeader.__str__(self), self.file_num,
+ self.first_record_offset, self.partition_num,
+ self.efp_data_size_kb, self.timestamp_str(),
+ self._get_warnings())
class EnqueueRecord(RecordHeader):
FORMAT = '<2Q'
EXTERNAL_FLAG_MASK = 0x20
TRANSIENT_FLAG_MASK = 0x10
- def init(self, file_offset, xid_size, data_size):
+ def init(self, _, xid_size, data_size):
self.xid_size = xid_size
self.data_size = data_size
self.xid = None
@@ -529,6 +681,21 @@ class EnqueueRecord(RecordHeader):
return self.user_flags & EnqueueRecord.EXTERNAL_FLAG_MASK > 0
def is_transient(self):
return self.user_flags & EnqueueRecord.TRANSIENT_FLAG_MASK > 0
+ def is_valid(self, journal_file):
+ if not RecordHeader.is_header_valid(self, journal_file.file_header):
+ return False
+ if not (self.xid_complete and self.data_complete):
+ return False
+ if self.xid_size > 0 and len(self.xid) != self.xid_size:
+ return False
+ if self.data_size > 0 and len(self.data) != self.data_size:
+ return False
+ if self.xid_size > 0 or self.data_size > 0:
+ if self.record_tail is None:
+ return False
+ if not self.record_tail.is_valid(self):
+ return False
+ return True
def load(self, file_handle):
"""Return True when load is incomplete and must be called again with new file handle"""
self.xid, self.xid_complete = Utils.load_data(file_handle, self.xid, self.xid_size)
@@ -545,7 +712,10 @@ class EnqueueRecord(RecordHeader):
self.record_tail = RecordTail(file_handle)
elif not self.record_tail.is_complete():
self.record_tail.load(file_handle) # Continue loading partially loaded tail
- return not self.record_tail.is_complete()
+ if self.record_tail.is_complete():
+ self.record_tail.is_valid(self)
+ else:
+ return True
return False
def _print_flags(self):
"""Utility function to decode the flags field in the header and print a string representation"""
@@ -566,13 +736,14 @@ class EnqueueRecord(RecordHeader):
record_tail_str = ''
else:
record_tail_str = str(self.record_tail)
- return '%s %s %s %s %s' % (RecordHeader.__str__(self), Utils.format_xid(self.xid, self.xid_size),
- Utils.format_data(self.data_size, self.data), record_tail_str, self._print_flags())
+ return '%s %s %s %s %s %s' % (RecordHeader.__str__(self), Utils.format_xid(self.xid, self.xid_size),
+ Utils.format_data(self.data_size, self.data), record_tail_str,
+ self._print_flags(), self._get_warnings())
class DequeueRecord(RecordHeader):
FORMAT = '<2Q'
TXN_COMPLETE_COMMIT_FLAG = 0x10
- def init(self, file_offset, dequeue_record_id, xid_size):
+ def init(self, _, dequeue_record_id, xid_size):
self.dequeue_record_id = dequeue_record_id
self.xid_size = xid_size
self.transaction_prepared_list_flag = False
@@ -581,6 +752,19 @@ class DequeueRecord(RecordHeader):
self.record_tail = None
def is_transaction_complete_commit(self):
return self.user_flags & DequeueRecord.TXN_COMPLETE_COMMIT_FLAG > 0
+ def is_valid(self, journal_file):
+ if not RecordHeader.is_header_valid(self, journal_file.file_header):
+ return False
+ if self.xid_size > 0:
+ if not self.xid_complete:
+ return False
+ if self.xid_size > 0 and len(self.xid) != self.xid_size:
+ return False
+ if self.record_tail is None:
+ return False
+ if not self.record_tail.is_valid(self):
+ return False
+ return True
def load(self, file_handle):
"""Return True when load is incomplete and must be called again with new file handle"""
self.xid, self.xid_complete = Utils.load_data(file_handle, self.xid, self.xid_size)
@@ -591,7 +775,10 @@ class DequeueRecord(RecordHeader):
self.record_tail = RecordTail(file_handle)
elif not self.record_tail.is_complete():
self.record_tail.load(file_handle)
- return not self.record_tail.is_complete()
+ if self.record_tail.is_complete():
+ self.record_tail.is_valid(self)
+ else:
+ return True
return False
def _print_flags(self):
"""Utility function to decode the flags field in the header and print a string representation"""
@@ -607,16 +794,27 @@ class DequeueRecord(RecordHeader):
record_tail_str = ''
else:
record_tail_str = str(self.record_tail)
- return '%s %s drid=0x%x %s %s' % (RecordHeader.__str__(self), Utils.format_xid(self.xid, self.xid_size),
- self.dequeue_record_id, record_tail_str, self._print_flags())
+ return '%s %s drid=0x%x %s %s %s' % (RecordHeader.__str__(self), Utils.format_xid(self.xid, self.xid_size),
+ self.dequeue_record_id, record_tail_str, self._print_flags(),
+ self._get_warnings())
class TransactionRecord(RecordHeader):
FORMAT = '<Q'
- def init(self, file_offset, xid_size):
+ def init(self, _, xid_size):
self.xid_size = xid_size
self.xid = None
self.xid_complete = False
self.record_tail = None
+ def is_valid(self, journal_file):
+ if not RecordHeader.is_header_valid(self, journal_file.file_header):
+ return False
+ if not self.xid_complete or len(self.xid) != self.xid_size:
+ return False
+ if self.record_tail is None:
+ return False
+ if not self.record_tail.is_valid(self):
+ return False
+ return True
def load(self, file_handle):
"""Return True when load is incomplete and must be called again with new file handle"""
self.xid, self.xid_complete = Utils.load_data(file_handle, self.xid, self.xid_size)
@@ -627,7 +825,10 @@ class TransactionRecord(RecordHeader):
self.record_tail = RecordTail(file_handle)
elif not self.record_tail.is_complete():
self.record_tail.load(file_handle)
- return not self.record_tail.is_complete()
+ if self.record_tail.is_complete():
+ self.record_tail.is_valid(self)
+ else:
+ return True
return False
def __str__(self):
"""Return a string representation of the this TransactionRecord instance"""
@@ -635,14 +836,40 @@ class TransactionRecord(RecordHeader):
record_tail_str = ''
else:
record_tail_str = str(self.record_tail)
- return '%s %s %s' % (RecordHeader.__str__(self), Utils.format_xid(self.xid, self.xid_size), record_tail_str)
+ return '%s %s %s %s' % (RecordHeader.__str__(self), Utils.format_xid(self.xid, self.xid_size), record_tail_str,
+ self._get_warnings())
class Utils(object):
"""Class containing utility functions for dealing with the journal"""
DBLK_SIZE = 128
RECORD_VERSION = 2
SBLK_SIZE = 4096
- __printchars = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~ '
+ @staticmethod
+ def create_record(magic, uflags, journal_file, record_id, dequeue_record_id, xid, data):
+ record_class = _CLASSES.get(magic[-1])
+ record = record_class(0, magic, Utils.RECORD_VERSION, uflags, journal_file.file_header.serial, record_id)
+ xid_length = len(xid) if xid is not None else 0
+ if isinstance(record, EnqueueRecord):
+ data_length = len(data) if data is not None else 0
+ record.init(None, xid_length, data_length)
+ elif isinstance(record, DequeueRecord):
+ record.init(None, dequeue_record_id, xid_length)
+ elif isinstance(record, TransactionRecord):
+ record.init(None, xid_length)
+ else:
+ raise qls.err.InvalidClassError(record.__class__.__name__)
+ if xid is not None:
+ record.xid = xid
+ record.xid_complete = True
+ if data is not None:
+ record.data = data
+ record.data_complete = True
+ record.record_tail = RecordTail(None)
+ record.record_tail.xmagic = Utils.inv_str(magic)
+ record.record_tail.checksum = 0 # TODO: when we can calculate checksums, add this here
+ record.record_tail.serial = record.serial
+ record.record_tail.record_id = record.record_id
+ return record
@staticmethod
def format_data(dsize, data):
"""Format binary data for printing"""
@@ -727,7 +954,7 @@ class Utils(object):
hstr += '\\%02x' % ord(in_str[index])
return hstr
@staticmethod
- def _hex_split_str(in_str, split_size = 50):
+ def _hex_split_str(in_str):#, split_size = 50):
"""Split a hex string into two parts separated by an ellipsis"""
# if len(in_str) <= split_size:
# return Utils._hex_str(in_str, 0, len(in_str))
@@ -736,7 +963,10 @@ class Utils(object):
@staticmethod
def _is_printable(in_str):
"""Return True if in_str in printable; False otherwise."""
- return in_str.strip(Utils.__printchars) == ''
+ for this_char in in_str:
+ if this_char not in string.printable:
+ return False
+ return True
@staticmethod
def _rem_bytes_in_block(file_handle, block_size):
"""Return the remaining bytes in a block"""
Modified: qpid/branches/java-broker-bdb-ha/qpid/tools/src/py/qpid_qls_analyze.py
URL: http://svn.apache.org/viewvc/qpid/branches/java-broker-bdb-ha/qpid/tools/src/py/qpid_qls_analyze.py?rev=1562466&r1=1562465&r2=1562466&view=diff
==============================================================================
--- qpid/branches/java-broker-bdb-ha/qpid/tools/src/py/qpid_qls_analyze.py (original)
+++ qpid/branches/java-broker-bdb-ha/qpid/tools/src/py/qpid_qls_analyze.py Wed Jan 29 14:48:50 2014
@@ -36,8 +36,9 @@ class QqpdLinearStoreAnalyzer(object):
def __init__(self):
self.args = None
self._process_args()
- self.efp_manager = efp.EfpManager(self.args.qls_dir)
- self.jrnl_recovery_mgr = jrnl.JournalRecoveryManager(self.args.qls_dir)
+ self.qls_dir = os.path.abspath(self.args.qls_dir)
+ self.efp_manager = efp.EfpManager(self.qls_dir)
+ self.jrnl_recovery_mgr = jrnl.JournalRecoveryManager(self.qls_dir)
def _analyze_efp(self):
self.efp_manager.run(self.args)
def _analyze_journals(self):
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org