You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@allura.apache.org by br...@apache.org on 2013/05/30 16:25:08 UTC

[1/4] git commit: [#4994] Fixed race condition when posting notifications

Updated Branches:
  refs/heads/master 4727a8b98 -> 956b6c5f1


[#4994] Fixed race condition when posting notifications

If the thread that posts the notification takes a while to flush the
session (most easily seen on repo refreshes), then the Task can get
started before the new Notification instance is saved to mongo

Signed-off-by: Cory Johns <cj...@slashdotmedia.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-allura/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-allura/commit/956b6c5f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-allura/tree/956b6c5f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-allura/diff/956b6c5f

Branch: refs/heads/master
Commit: 956b6c5f19c92b59d61298c8f11f0166b807243b
Parents: f3923bf
Author: Cory Johns <cj...@slashdotmedia.com>
Authored: Wed May 29 22:58:40 2013 +0000
Committer: Cory Johns <cj...@slashdotmedia.com>
Committed: Wed May 29 23:10:23 2013 +0000

----------------------------------------------------------------------
 Allura/allura/model/notification.py |    7 ++++++-
 1 files changed, 6 insertions(+), 1 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-allura/blob/956b6c5f/Allura/allura/model/notification.py
----------------------------------------------------------------------
diff --git a/Allura/allura/model/notification.py b/Allura/allura/model/notification.py
index ebc0eef..f5c9c6b 100644
--- a/Allura/allura/model/notification.py
+++ b/Allura/allura/model/notification.py
@@ -106,6 +106,8 @@ class Notification(MappedClass):
         import allura.tasks.notification_tasks
         n = cls._make_notification(artifact, topic, **kw)
         if n:
+            # make sure notification is flushed in time for task to process it
+            session(n).flush(n)
             allura.tasks.notification_tasks.notify.post(
                 n._id, artifact.index_id(), topic)
         return n
@@ -532,7 +534,10 @@ class Mailbox(MappedClass):
         '''
         notifications = Notification.query.find(dict(_id={'$in':self.queue}))
         notifications = notifications.all()
-        log.debug('Firing mailbox %s notifications [%s], found [%s]', str(self._id), ', '.join(self.queue), ', '.join([n._id for n in notifications]))
+        if len(notifications) != len(self.queue):
+            log.error('Mailbox queue error: Mailbox %s queued [%s], found [%s]', str(self._id), ', '.join(self.queue), ', '.join([n._id for n in notifications]))
+        else:
+            log.debug('Firing mailbox %s notifications [%s], found [%s]', str(self._id), ', '.join(self.queue), ', '.join([n._id for n in notifications]))
         if self.type == 'direct':
             ngroups = defaultdict(list)
             for n in notifications:


[3/4] git commit: [#4994] Added more logging when sending notifications for debugging

Posted by br...@apache.org.
[#4994] Added more logging when sending notifications for debugging

Signed-off-by: Cory Johns <cj...@slashdotmedia.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-allura/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-allura/commit/f3923bf2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-allura/tree/f3923bf2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-allura/diff/f3923bf2

Branch: refs/heads/master
Commit: f3923bf2095afaf2f0c79f8da4f151560727a105
Parents: 9bc64c9
Author: Cory Johns <cj...@slashdotmedia.com>
Authored: Wed May 29 17:28:25 2013 +0000
Committer: Cory Johns <cj...@slashdotmedia.com>
Committed: Wed May 29 23:10:23 2013 +0000

----------------------------------------------------------------------
 Allura/allura/model/notification.py |    8 +++++++-
 1 files changed, 7 insertions(+), 1 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-allura/blob/f3923bf2/Allura/allura/model/notification.py
----------------------------------------------------------------------
diff --git a/Allura/allura/model/notification.py b/Allura/allura/model/notification.py
index 1ec530a..ebc0eef 100644
--- a/Allura/allura/model/notification.py
+++ b/Allura/allura/model/notification.py
@@ -237,6 +237,7 @@ class Notification(MappedClass):
     def send_direct(self, user_id):
         user = User.query.get(_id=ObjectId(user_id))
         artifact = self.ref.artifact
+        log.debug('Sending direct notification %s to user %s', self._id, user_id)
         # Don't send if user doesn't have read perms to the artifact
         if user and artifact and \
                 not security.has_access(artifact, 'read', user)():
@@ -265,6 +266,7 @@ class Notification(MappedClass):
                     security.has_access(artifact, 'read', user)()
         notifications = filter(perm_check, notifications)
 
+        log.debug('Sending digest of notifications [%s] to user %s', ', '.join([n._id for n in notifications]), user_id)
         if reply_to_address is None:
             reply_to_address = from_address
         text = [ 'Digest of %s' % subject ]
@@ -287,6 +289,7 @@ class Notification(MappedClass):
     @classmethod
     def send_summary(self, user_id, from_address, subject, notifications):
         if not notifications: return
+        log.debug('Sending summary of notifications [%s] to user %s', ', '.join([n._id for n in notifications]), user_id)
         text = [ 'Digest of %s' % subject ]
         for n in notifications:
             text.append('From: %s' % n.from_address)
@@ -453,7 +456,9 @@ class Mailbox(MappedClass):
             'artifact_index_id':{'$in':[None, artifact_index_id]},
             'topic':{'$in':[None, topic]}
             }
-        for mbox in cls.query.find(d):
+        mboxes = cls.query.find(d).all()
+        log.debug('Delivering notification %s to mailboxes [%s]', nid, ', '.join([str(m._id) for m in mboxes]))
+        for mbox in mboxes:
             try:
                 mbox.query.update(
                     {'$push':dict(queue=nid),
@@ -527,6 +532,7 @@ class Mailbox(MappedClass):
         '''
         notifications = Notification.query.find(dict(_id={'$in':self.queue}))
         notifications = notifications.all()
+        log.debug('Firing mailbox %s notifications [%s], found [%s]', str(self._id), ', '.join(self.queue), ', '.join([n._id for n in notifications]))
         if self.type == 'direct':
             ngroups = defaultdict(list)
             for n in notifications:


[2/4] git commit: [#4994] Fixed bug in direct notification accumulation that could cause missing notifications

Posted by br...@apache.org.
[#4994] Fixed bug in direct notification accumulation that could cause missing notifications

Signed-off-by: Cory Johns <cj...@slashdotmedia.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-allura/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-allura/commit/9bc64c9b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-allura/tree/9bc64c9b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-allura/diff/9bc64c9b

Branch: refs/heads/master
Commit: 9bc64c9bf99bc272e7b9862c8ff50432e2eb02db
Parents: aef20b8
Author: Cory Johns <cj...@slashdotmedia.com>
Authored: Wed May 29 16:38:28 2013 +0000
Committer: Cory Johns <cj...@slashdotmedia.com>
Committed: Wed May 29 23:10:23 2013 +0000

----------------------------------------------------------------------
 Allura/allura/model/notification.py            |    2 +-
 Allura/allura/tests/model/test_notification.py |   40 +++++++++++++++++++
 2 files changed, 41 insertions(+), 1 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-allura/blob/9bc64c9b/Allura/allura/model/notification.py
----------------------------------------------------------------------
diff --git a/Allura/allura/model/notification.py b/Allura/allura/model/notification.py
index 61a5657..1ec530a 100644
--- a/Allura/allura/model/notification.py
+++ b/Allura/allura/model/notification.py
@@ -549,7 +549,7 @@ class Mailbox(MappedClass):
             for (subject, from_address, reply_to_address, author_id), ns in ngroups.iteritems():
                 try:
                     if len(ns) == 1:
-                        n.send_direct(self.user_id)
+                        ns[0].send_direct(self.user_id)
                     else:
                         Notification.send_digest(
                             self.user_id, from_address, subject, ns, reply_to_address)

http://git-wip-us.apache.org/repos/asf/incubator-allura/blob/9bc64c9b/Allura/allura/tests/model/test_notification.py
----------------------------------------------------------------------
diff --git a/Allura/allura/tests/model/test_notification.py b/Allura/allura/tests/model/test_notification.py
index a894e20..6cdfbbb 100644
--- a/Allura/allura/tests/model/test_notification.py
+++ b/Allura/allura/tests/model/test_notification.py
@@ -17,10 +17,13 @@
 
 import unittest
 from datetime import timedelta
+import collections
 
 from pylons import tmpl_context as c, app_globals as g
 from nose.tools import assert_equal, assert_in
 from ming.orm import ThreadLocalORMSession
+import mock
+import bson
 
 from alluratest.controller import setup_basic_test, setup_global_objects, REGISTRY
 from allura import model as M
@@ -258,6 +261,43 @@ class TestSubscriptionTypes(unittest.TestCase):
     def _post_notification(self, text=None):
         return M.Notification.post(self.pg, 'metadata', text=text)
 
+    @mock.patch('allura.model.notification.defaultdict')
+    @mock.patch('allura.model.notification.Notification')
+    def test_direct_accumulation(self, mocked_notification, mocked_defaultdict):
+        class OrderedDefaultDict(collections.OrderedDict):
+            def __init__(self, factory=list, *a, **kw):
+                self._factory = factory
+                super(OrderedDefaultDict, self).__init__(*a, **kw)
+
+            def __getitem__(self, key):
+                if key not in self:
+                    value = self[key] = self._factory()
+                else:
+                    value = super(OrderedDefaultDict, self).__getitem__(key)
+                return value
+
+        notifications = mocked_notification.query.find.return_value.all.return_value = [
+                mock.Mock(_id='n0', topic='metadata', subject='s1', from_address='f1', reply_to_address='rt1', author_id='a1'),
+                mock.Mock(_id='n1', topic='metadata', subject='s2', from_address='f2', reply_to_address='rt2', author_id='a2'),
+                mock.Mock(_id='n2', topic='metadata', subject='s2', from_address='f2', reply_to_address='rt2', author_id='a2'),
+                mock.Mock(_id='n3', topic='message', subject='s3', from_address='f3', reply_to_address='rt3', author_id='a3'),
+                mock.Mock(_id='n4', topic='message', subject='s3', from_address='f3', reply_to_address='rt3', author_id='a3'),
+            ]
+        mocked_defaultdict.side_effect = OrderedDefaultDict
+
+        u0 = bson.ObjectId()
+        mbox = M.Mailbox(type='direct', user_id=u0, queue=['n0', 'n1', 'n2', 'n3', 'n4'])
+        mbox.fire('now')
+
+        mocked_notification.query.find.assert_called_once_with({'_id': {'$in': ['n0', 'n1', 'n2', 'n3', 'n4']}})
+        # first notification should be sent direct, as its key values are unique
+        notifications[0].send_direct.assert_called_once_with(u0)
+        # next two notifications should be sent as a digest as they have matching key values
+        mocked_notification.send_digest.assert_called_once_with(u0, 'f2', 's2', [notifications[1], notifications[2]], 'rt2')
+        # final two should be sent direct even though they matching keys, as they are messages
+        notifications[3].send_direct.assert_called_once_with(u0)
+        notifications[4].send_direct.assert_called_once_with(u0)
+
 def _clear_subscriptions():
         M.Mailbox.query.remove({})
 


[4/4] git commit: [#4994] Improved error checking around notifications to prevent lost notifications

Posted by br...@apache.org.
[#4994] Improved error checking around notifications to prevent lost notifications

Signed-off-by: Cory Johns <cj...@slashdotmedia.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-allura/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-allura/commit/aef20b8c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-allura/tree/aef20b8c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-allura/diff/aef20b8c

Branch: refs/heads/master
Commit: aef20b8c609832aec814eb5de27fbf617cc81f19
Parents: 4727a8b
Author: Cory Johns <cj...@slashdotmedia.com>
Authored: Wed May 29 15:44:24 2013 +0000
Committer: Cory Johns <cj...@slashdotmedia.com>
Committed: Wed May 29 23:10:23 2013 +0000

----------------------------------------------------------------------
 Allura/allura/model/notification.py |   67 +++++++++++++++++++++---------
 1 files changed, 47 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-allura/blob/aef20b8c/Allura/allura/model/notification.py
----------------------------------------------------------------------
diff --git a/Allura/allura/model/notification.py b/Allura/allura/model/notification.py
index 3c3517f..61a5657 100644
--- a/Allura/allura/model/notification.py
+++ b/Allura/allura/model/notification.py
@@ -454,13 +454,20 @@ class Mailbox(MappedClass):
             'topic':{'$in':[None, topic]}
             }
         for mbox in cls.query.find(d):
-            mbox.query.update(
-                {'$push':dict(queue=nid),
-                 '$set':dict(last_modified=datetime.utcnow(),
-                             queue_empty=False),
-                })
-            # Make sure the mbox doesn't stick around to be flush()ed
-            session(mbox).expunge(mbox)
+            try:
+                mbox.query.update(
+                    {'$push':dict(queue=nid),
+                     '$set':dict(last_modified=datetime.utcnow(),
+                                 queue_empty=False),
+                    })
+                # Make sure the mbox doesn't stick around to be flush()ed
+                session(mbox).expunge(mbox)
+            except:
+                # log error but try to keep processing, lest all the other eligible
+                # mboxes for this notification get skipped and lost forever
+                log.exception(
+                    'Error adding notification: %s for artifact %s on project %s to user %s',
+                    nid, artifact_index_id, c.project._id, mbox.user_id)
 
     @classmethod
     def fire_ready(cls):
@@ -490,7 +497,11 @@ class Mailbox(MappedClass):
                 new=False)
 
         for mbox in take_while_true(find_and_modify_direct_mbox):
-            mbox.fire(now)
+            try:
+                mbox.fire(now)
+            except:
+                log.exception('Error firing mbox: %s with queue: [%s]', str(mbox._id), ', '.join(mbox.queue))
+                raise  # re-raise so we don't keep (destructively) trying to process mboxes
 
         for mbox in cls.query.find(q_digest):
             next_scheduled = now
@@ -519,20 +530,36 @@ class Mailbox(MappedClass):
         if self.type == 'direct':
             ngroups = defaultdict(list)
             for n in notifications:
-                if n.topic == 'message':
-                    n.send_direct(self.user_id)
-                    # Messages must be sent individually so they can be replied
-                    # to individually
-                else:
-                    key = (n.subject, n.from_address, n.reply_to_address, n.author_id)
-                    ngroups[key].append(n)
+                try:
+                    if n.topic == 'message':
+                        n.send_direct(self.user_id)
+                        # Messages must be sent individually so they can be replied
+                        # to individually
+                    else:
+                        key = (n.subject, n.from_address, n.reply_to_address, n.author_id)
+                        ngroups[key].append(n)
+                except:
+                    # log error but keep trying to deliver other notifications,
+                    # lest the other notifications (which have already been removed
+                    # from the mobx's queue in mongo) be lost
+                    log.exception(
+                        'Error sending notification: %s to mbox %s (user %s)',
+                        n._id, self._id, self.user_id)
             # Accumulate messages from same address with same subject
             for (subject, from_address, reply_to_address, author_id), ns in ngroups.iteritems():
-                if len(ns) == 1:
-                    n.send_direct(self.user_id)
-                else:
-                    Notification.send_digest(
-                        self.user_id, from_address, subject, ns, reply_to_address)
+                try:
+                    if len(ns) == 1:
+                        n.send_direct(self.user_id)
+                    else:
+                        Notification.send_digest(
+                            self.user_id, from_address, subject, ns, reply_to_address)
+                except:
+                    # log error but keep trying to deliver other notifications,
+                    # lest the other notifications (which have already been removed
+                    # from the mobx's queue in mongo) be lost
+                    log.exception(
+                        'Error sending notifications: [%s] to mbox %s (user %s)',
+                        ', '.join([n._id for n in ns]), self._id, self.user_id)
         elif self.type == 'digest':
             Notification.send_digest(
                 self.user_id, u'noreply@in.sf.net', 'Digest Email',