You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@allura.apache.org by br...@apache.org on 2013/05/30 16:25:08 UTC
[1/4] git commit: [#4994] Fixed race condition when posting
notifications
Updated Branches:
refs/heads/master 4727a8b98 -> 956b6c5f1
[#4994] Fixed race condition when posting notifications
If the thread that posts the notification takes a while to flush the
session (most easily seen on repo refreshes), then the Task can get
started before the new Notification instance is saved to mongo
Signed-off-by: Cory Johns <cj...@slashdotmedia.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-allura/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-allura/commit/956b6c5f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-allura/tree/956b6c5f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-allura/diff/956b6c5f
Branch: refs/heads/master
Commit: 956b6c5f19c92b59d61298c8f11f0166b807243b
Parents: f3923bf
Author: Cory Johns <cj...@slashdotmedia.com>
Authored: Wed May 29 22:58:40 2013 +0000
Committer: Cory Johns <cj...@slashdotmedia.com>
Committed: Wed May 29 23:10:23 2013 +0000
----------------------------------------------------------------------
Allura/allura/model/notification.py | 7 ++++++-
1 files changed, 6 insertions(+), 1 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-allura/blob/956b6c5f/Allura/allura/model/notification.py
----------------------------------------------------------------------
diff --git a/Allura/allura/model/notification.py b/Allura/allura/model/notification.py
index ebc0eef..f5c9c6b 100644
--- a/Allura/allura/model/notification.py
+++ b/Allura/allura/model/notification.py
@@ -106,6 +106,8 @@ class Notification(MappedClass):
import allura.tasks.notification_tasks
n = cls._make_notification(artifact, topic, **kw)
if n:
+ # make sure notification is flushed in time for task to process it
+ session(n).flush(n)
allura.tasks.notification_tasks.notify.post(
n._id, artifact.index_id(), topic)
return n
@@ -532,7 +534,10 @@ class Mailbox(MappedClass):
'''
notifications = Notification.query.find(dict(_id={'$in':self.queue}))
notifications = notifications.all()
- log.debug('Firing mailbox %s notifications [%s], found [%s]', str(self._id), ', '.join(self.queue), ', '.join([n._id for n in notifications]))
+ if len(notifications) != len(self.queue):
+ log.error('Mailbox queue error: Mailbox %s queued [%s], found [%s]', str(self._id), ', '.join(self.queue), ', '.join([n._id for n in notifications]))
+ else:
+ log.debug('Firing mailbox %s notifications [%s], found [%s]', str(self._id), ', '.join(self.queue), ', '.join([n._id for n in notifications]))
if self.type == 'direct':
ngroups = defaultdict(list)
for n in notifications:
[3/4] git commit: [#4994] Added more logging when sending
notifications for debugging
Posted by br...@apache.org.
[#4994] Added more logging when sending notifications for debugging
Signed-off-by: Cory Johns <cj...@slashdotmedia.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-allura/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-allura/commit/f3923bf2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-allura/tree/f3923bf2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-allura/diff/f3923bf2
Branch: refs/heads/master
Commit: f3923bf2095afaf2f0c79f8da4f151560727a105
Parents: 9bc64c9
Author: Cory Johns <cj...@slashdotmedia.com>
Authored: Wed May 29 17:28:25 2013 +0000
Committer: Cory Johns <cj...@slashdotmedia.com>
Committed: Wed May 29 23:10:23 2013 +0000
----------------------------------------------------------------------
Allura/allura/model/notification.py | 8 +++++++-
1 files changed, 7 insertions(+), 1 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-allura/blob/f3923bf2/Allura/allura/model/notification.py
----------------------------------------------------------------------
diff --git a/Allura/allura/model/notification.py b/Allura/allura/model/notification.py
index 1ec530a..ebc0eef 100644
--- a/Allura/allura/model/notification.py
+++ b/Allura/allura/model/notification.py
@@ -237,6 +237,7 @@ class Notification(MappedClass):
def send_direct(self, user_id):
user = User.query.get(_id=ObjectId(user_id))
artifact = self.ref.artifact
+ log.debug('Sending direct notification %s to user %s', self._id, user_id)
# Don't send if user doesn't have read perms to the artifact
if user and artifact and \
not security.has_access(artifact, 'read', user)():
@@ -265,6 +266,7 @@ class Notification(MappedClass):
security.has_access(artifact, 'read', user)()
notifications = filter(perm_check, notifications)
+ log.debug('Sending digest of notifications [%s] to user %s', ', '.join([n._id for n in notifications]), user_id)
if reply_to_address is None:
reply_to_address = from_address
text = [ 'Digest of %s' % subject ]
@@ -287,6 +289,7 @@ class Notification(MappedClass):
@classmethod
def send_summary(self, user_id, from_address, subject, notifications):
if not notifications: return
+ log.debug('Sending summary of notifications [%s] to user %s', ', '.join([n._id for n in notifications]), user_id)
text = [ 'Digest of %s' % subject ]
for n in notifications:
text.append('From: %s' % n.from_address)
@@ -453,7 +456,9 @@ class Mailbox(MappedClass):
'artifact_index_id':{'$in':[None, artifact_index_id]},
'topic':{'$in':[None, topic]}
}
- for mbox in cls.query.find(d):
+ mboxes = cls.query.find(d).all()
+ log.debug('Delivering notification %s to mailboxes [%s]', nid, ', '.join([str(m._id) for m in mboxes]))
+ for mbox in mboxes:
try:
mbox.query.update(
{'$push':dict(queue=nid),
@@ -527,6 +532,7 @@ class Mailbox(MappedClass):
'''
notifications = Notification.query.find(dict(_id={'$in':self.queue}))
notifications = notifications.all()
+ log.debug('Firing mailbox %s notifications [%s], found [%s]', str(self._id), ', '.join(self.queue), ', '.join([n._id for n in notifications]))
if self.type == 'direct':
ngroups = defaultdict(list)
for n in notifications:
[2/4] git commit: [#4994] Fixed bug in direct notification
accumulation that could cause missing notifications
Posted by br...@apache.org.
[#4994] Fixed bug in direct notification accumulation that could cause missing notifications
Signed-off-by: Cory Johns <cj...@slashdotmedia.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-allura/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-allura/commit/9bc64c9b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-allura/tree/9bc64c9b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-allura/diff/9bc64c9b
Branch: refs/heads/master
Commit: 9bc64c9bf99bc272e7b9862c8ff50432e2eb02db
Parents: aef20b8
Author: Cory Johns <cj...@slashdotmedia.com>
Authored: Wed May 29 16:38:28 2013 +0000
Committer: Cory Johns <cj...@slashdotmedia.com>
Committed: Wed May 29 23:10:23 2013 +0000
----------------------------------------------------------------------
Allura/allura/model/notification.py | 2 +-
Allura/allura/tests/model/test_notification.py | 40 +++++++++++++++++++
2 files changed, 41 insertions(+), 1 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-allura/blob/9bc64c9b/Allura/allura/model/notification.py
----------------------------------------------------------------------
diff --git a/Allura/allura/model/notification.py b/Allura/allura/model/notification.py
index 61a5657..1ec530a 100644
--- a/Allura/allura/model/notification.py
+++ b/Allura/allura/model/notification.py
@@ -549,7 +549,7 @@ class Mailbox(MappedClass):
for (subject, from_address, reply_to_address, author_id), ns in ngroups.iteritems():
try:
if len(ns) == 1:
- n.send_direct(self.user_id)
+ ns[0].send_direct(self.user_id)
else:
Notification.send_digest(
self.user_id, from_address, subject, ns, reply_to_address)
http://git-wip-us.apache.org/repos/asf/incubator-allura/blob/9bc64c9b/Allura/allura/tests/model/test_notification.py
----------------------------------------------------------------------
diff --git a/Allura/allura/tests/model/test_notification.py b/Allura/allura/tests/model/test_notification.py
index a894e20..6cdfbbb 100644
--- a/Allura/allura/tests/model/test_notification.py
+++ b/Allura/allura/tests/model/test_notification.py
@@ -17,10 +17,13 @@
import unittest
from datetime import timedelta
+import collections
from pylons import tmpl_context as c, app_globals as g
from nose.tools import assert_equal, assert_in
from ming.orm import ThreadLocalORMSession
+import mock
+import bson
from alluratest.controller import setup_basic_test, setup_global_objects, REGISTRY
from allura import model as M
@@ -258,6 +261,43 @@ class TestSubscriptionTypes(unittest.TestCase):
def _post_notification(self, text=None):
return M.Notification.post(self.pg, 'metadata', text=text)
+ @mock.patch('allura.model.notification.defaultdict')
+ @mock.patch('allura.model.notification.Notification')
+ def test_direct_accumulation(self, mocked_notification, mocked_defaultdict):
+ class OrderedDefaultDict(collections.OrderedDict):
+ def __init__(self, factory=list, *a, **kw):
+ self._factory = factory
+ super(OrderedDefaultDict, self).__init__(*a, **kw)
+
+ def __getitem__(self, key):
+ if key not in self:
+ value = self[key] = self._factory()
+ else:
+ value = super(OrderedDefaultDict, self).__getitem__(key)
+ return value
+
+ notifications = mocked_notification.query.find.return_value.all.return_value = [
+ mock.Mock(_id='n0', topic='metadata', subject='s1', from_address='f1', reply_to_address='rt1', author_id='a1'),
+ mock.Mock(_id='n1', topic='metadata', subject='s2', from_address='f2', reply_to_address='rt2', author_id='a2'),
+ mock.Mock(_id='n2', topic='metadata', subject='s2', from_address='f2', reply_to_address='rt2', author_id='a2'),
+ mock.Mock(_id='n3', topic='message', subject='s3', from_address='f3', reply_to_address='rt3', author_id='a3'),
+ mock.Mock(_id='n4', topic='message', subject='s3', from_address='f3', reply_to_address='rt3', author_id='a3'),
+ ]
+ mocked_defaultdict.side_effect = OrderedDefaultDict
+
+ u0 = bson.ObjectId()
+ mbox = M.Mailbox(type='direct', user_id=u0, queue=['n0', 'n1', 'n2', 'n3', 'n4'])
+ mbox.fire('now')
+
+ mocked_notification.query.find.assert_called_once_with({'_id': {'$in': ['n0', 'n1', 'n2', 'n3', 'n4']}})
+ # first notification should be sent direct, as its key values are unique
+ notifications[0].send_direct.assert_called_once_with(u0)
+ # next two notifications should be sent as a digest as they have matching key values
+ mocked_notification.send_digest.assert_called_once_with(u0, 'f2', 's2', [notifications[1], notifications[2]], 'rt2')
+ # final two should be sent direct even though they matching keys, as they are messages
+ notifications[3].send_direct.assert_called_once_with(u0)
+ notifications[4].send_direct.assert_called_once_with(u0)
+
def _clear_subscriptions():
M.Mailbox.query.remove({})
[4/4] git commit: [#4994] Improved error checking around
notifications to prevent lost notifications
Posted by br...@apache.org.
[#4994] Improved error checking around notifications to prevent lost notifications
Signed-off-by: Cory Johns <cj...@slashdotmedia.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-allura/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-allura/commit/aef20b8c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-allura/tree/aef20b8c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-allura/diff/aef20b8c
Branch: refs/heads/master
Commit: aef20b8c609832aec814eb5de27fbf617cc81f19
Parents: 4727a8b
Author: Cory Johns <cj...@slashdotmedia.com>
Authored: Wed May 29 15:44:24 2013 +0000
Committer: Cory Johns <cj...@slashdotmedia.com>
Committed: Wed May 29 23:10:23 2013 +0000
----------------------------------------------------------------------
Allura/allura/model/notification.py | 67 +++++++++++++++++++++---------
1 files changed, 47 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-allura/blob/aef20b8c/Allura/allura/model/notification.py
----------------------------------------------------------------------
diff --git a/Allura/allura/model/notification.py b/Allura/allura/model/notification.py
index 3c3517f..61a5657 100644
--- a/Allura/allura/model/notification.py
+++ b/Allura/allura/model/notification.py
@@ -454,13 +454,20 @@ class Mailbox(MappedClass):
'topic':{'$in':[None, topic]}
}
for mbox in cls.query.find(d):
- mbox.query.update(
- {'$push':dict(queue=nid),
- '$set':dict(last_modified=datetime.utcnow(),
- queue_empty=False),
- })
- # Make sure the mbox doesn't stick around to be flush()ed
- session(mbox).expunge(mbox)
+ try:
+ mbox.query.update(
+ {'$push':dict(queue=nid),
+ '$set':dict(last_modified=datetime.utcnow(),
+ queue_empty=False),
+ })
+ # Make sure the mbox doesn't stick around to be flush()ed
+ session(mbox).expunge(mbox)
+ except:
+ # log error but try to keep processing, lest all the other eligible
+ # mboxes for this notification get skipped and lost forever
+ log.exception(
+ 'Error adding notification: %s for artifact %s on project %s to user %s',
+ nid, artifact_index_id, c.project._id, mbox.user_id)
@classmethod
def fire_ready(cls):
@@ -490,7 +497,11 @@ class Mailbox(MappedClass):
new=False)
for mbox in take_while_true(find_and_modify_direct_mbox):
- mbox.fire(now)
+ try:
+ mbox.fire(now)
+ except:
+ log.exception('Error firing mbox: %s with queue: [%s]', str(mbox._id), ', '.join(mbox.queue))
+ raise # re-raise so we don't keep (destructively) trying to process mboxes
for mbox in cls.query.find(q_digest):
next_scheduled = now
@@ -519,20 +530,36 @@ class Mailbox(MappedClass):
if self.type == 'direct':
ngroups = defaultdict(list)
for n in notifications:
- if n.topic == 'message':
- n.send_direct(self.user_id)
- # Messages must be sent individually so they can be replied
- # to individually
- else:
- key = (n.subject, n.from_address, n.reply_to_address, n.author_id)
- ngroups[key].append(n)
+ try:
+ if n.topic == 'message':
+ n.send_direct(self.user_id)
+ # Messages must be sent individually so they can be replied
+ # to individually
+ else:
+ key = (n.subject, n.from_address, n.reply_to_address, n.author_id)
+ ngroups[key].append(n)
+ except:
+ # log error but keep trying to deliver other notifications,
+ # lest the other notifications (which have already been removed
+ # from the mobx's queue in mongo) be lost
+ log.exception(
+ 'Error sending notification: %s to mbox %s (user %s)',
+ n._id, self._id, self.user_id)
# Accumulate messages from same address with same subject
for (subject, from_address, reply_to_address, author_id), ns in ngroups.iteritems():
- if len(ns) == 1:
- n.send_direct(self.user_id)
- else:
- Notification.send_digest(
- self.user_id, from_address, subject, ns, reply_to_address)
+ try:
+ if len(ns) == 1:
+ n.send_direct(self.user_id)
+ else:
+ Notification.send_digest(
+ self.user_id, from_address, subject, ns, reply_to_address)
+ except:
+ # log error but keep trying to deliver other notifications,
+ # lest the other notifications (which have already been removed
+ # from the mobx's queue in mongo) be lost
+ log.exception(
+ 'Error sending notifications: [%s] to mbox %s (user %s)',
+ ', '.join([n._id for n in ns]), self._id, self.user_id)
elif self.type == 'digest':
Notification.send_digest(
self.user_id, u'noreply@in.sf.net', 'Digest Email',