You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@allura.apache.org by br...@apache.org on 2013/11/19 21:33:06 UTC

git commit: [#6825] ticket:475 moved BatchIndexer from forge-classic to allura

Updated Branches:
  refs/heads/master 88c1fa16e -> 6897c237f


[#6825] ticket:475 moved BatchIndexer from forge-classic to allura


Project: http://git-wip-us.apache.org/repos/asf/incubator-allura/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-allura/commit/6897c237
Tree: http://git-wip-us.apache.org/repos/asf/incubator-allura/tree/6897c237
Diff: http://git-wip-us.apache.org/repos/asf/incubator-allura/diff/6897c237

Branch: refs/heads/master
Commit: 6897c237f7c5ff9012d6c9b4c9ebbafec715b5ae
Parents: 88c1fa1
Author: Yuriy Arhipov <yu...@yandex.ru>
Authored: Mon Nov 11 00:02:41 2013 +0400
Committer: Dave Brondsema <db...@slashdotmedia.com>
Committed: Tue Nov 19 20:32:34 2013 +0000

----------------------------------------------------------------------
 Allura/allura/model/session.py           |  99 +++++++++++++++++++
 Allura/allura/tests/unit/test_session.py | 135 ++++++++++++++++++++++++++
 ForgeImporters/forgeimporters/base.py    |   4 +-
 3 files changed, 237 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-allura/blob/6897c237/Allura/allura/model/session.py
----------------------------------------------------------------------
diff --git a/Allura/allura/model/session.py b/Allura/allura/model/session.py
index ff06b00..9a2c061 100644
--- a/Allura/allura/model/session.py
+++ b/Allura/allura/model/session.py
@@ -16,10 +16,15 @@
 #       under the License.
 
 import logging
+import pymongo
 
 from ming import Session
 from ming.orm.base import state
 from ming.orm.ormsession import ThreadLocalORMSession, SessionExtension
+from contextlib import contextmanager
+
+from allura.lib.utils import chunked_list
+from allura.tasks import index_tasks
 
 log = logging.getLogger(__name__)
 
@@ -83,6 +88,100 @@ class ArtifactSessionExtension(SessionExtension):
         if arefs:
             index_tasks.add_artifacts.post([aref._id for aref in arefs])
 
+class BatchIndexer(ArtifactSessionExtension):
+    """
+    Tracks needed search index operations over the life of a
+    :class:`ming.odm.session.ThreadLocalODMSession` session, and performs them
+    in a batch when :meth:`flush` is called.
+    """
+    to_delete = set()
+    to_add = set()
+
+    def __init__(self, session):
+        ArtifactSessionExtension.__init__(self, session)
+
+    def update_index(self, objects_deleted, arefs_added):
+        """
+        Caches adds and deletes for handling later.  Called after each flush of
+        the parent session.
+
+        :param objects_deleted: :class:`allura.model.artifact.Artifact`
+          instances that were deleted in the flush.
+
+        :param arefs_added: :class:`allura.model.artifact.ArtifactReference`
+          instances for all ``Artifact`` instances that were added or modified
+          in the flush.
+        """
+
+        from .index import ArtifactReference
+        del_index_ids = [obj.index_id() for obj in objects_deleted]
+        deleted_aref_ids = [aref._id for aref in
+            ArtifactReference.query.find(dict(_id={'$in': del_index_ids}))]
+        cls = self.__class__
+        cls.to_add -= set(deleted_aref_ids)
+        cls.to_delete |= set(del_index_ids)
+        cls.to_add |= set([aref._id for aref in arefs_added])
+
+    @classmethod
+    def flush(cls):
+        """
+        Creates indexing tasks for cached adds and deletes, and resets the
+        caches.
+
+        .. warning:: This method is NOT called automatically when the parent
+           session is flushed. It MUST be called explicitly.
+        """
+        # Post in chunks to avoid overflowing the max BSON document
+        # size when the Monq task is created:
+        # cls.to_delete - contains solr index ids which can easily be over
+        #                 100 bytes. Here we allow for 160 bytes avg, plus
+        #                 room for other document overhead.
+        # cls.to_add - contains BSON ObjectIds, which are 12 bytes each, so
+        #              we can easily put 1m in a doc with room left over.
+        if cls.to_delete:
+            for chunk in chunked_list(list(cls.to_delete), 100 * 1000):
+                cls._post(index_tasks.del_artifacts, chunk)
+
+        if cls.to_add:
+            for chunk in chunked_list(list(cls.to_add), 1000 * 1000):
+                cls._post(index_tasks.add_artifacts, chunk)
+        cls.to_delete = set()
+        cls.to_add = set()
+
+    @classmethod
+    def _post(cls, task_func, chunk):
+        """
+        Post task, recursively splitting and re-posting if the resulting
+        mongo document is too large.
+        """
+        try:
+            task_func.post(chunk)
+        except pymongo.errors.InvalidDocument as e:
+            # there are many types of InvalidDocument, only recurse if its expected to help
+            if str(e).startswith('BSON document too large'):
+                cls._post(task_func, chunk[:len(chunk) // 2])
+                cls._post(task_func, chunk[len(chunk) // 2:])
+            else:
+                raise
+
+
+@contextmanager
+def substitute_extensions(session, extensions=None):
+    """
+    Temporarily replace the extensions on a
+    :class:`ming.odm.session.ThreadLocalODMSession` session.
+    """
+    original_exts = session._kwargs.get('extensions', [])
+    def _set_exts(exts):
+        session.flush()
+        session.close()
+        session._kwargs['extensions'] = exts
+    _set_exts(extensions or [])
+    yield session
+    _set_exts(original_exts)
+
+
+
 main_doc_session = Session.by_name('main')
 project_doc_session = Session.by_name('project')
 task_doc_session = Session.by_name('task')

http://git-wip-us.apache.org/repos/asf/incubator-allura/blob/6897c237/Allura/allura/tests/unit/test_session.py
----------------------------------------------------------------------
diff --git a/Allura/allura/tests/unit/test_session.py b/Allura/allura/tests/unit/test_session.py
new file mode 100644
index 0000000..5b5449e
--- /dev/null
+++ b/Allura/allura/tests/unit/test_session.py
@@ -0,0 +1,135 @@
+#       Licensed to the Apache Software Foundation (ASF) under one
+#       or more contributor license agreements.  See the NOTICE file
+#       distributed with this work for additional information
+#       regarding copyright ownership.  The ASF licenses this file
+#       to you under the Apache License, Version 2.0 (the
+#       "License"); you may not use this file except in compliance
+#       with the License.  You may obtain a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#       Unless required by applicable law or agreed to in writing,
+#       software distributed under the License is distributed on an
+#       "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#       KIND, either express or implied.  See the License for the
+#       specific language governing permissions and limitations
+#       under the License.
+
+import pymongo
+import mock
+
+from unittest import TestCase
+
+from allura.tests import decorators as td
+from allura.model.session import BatchIndexer, substitute_extensions
+
+
+def test_extensions_cm():
+    session = mock.Mock(_kwargs=dict(extensions=[]))
+    extension = mock.Mock()
+    with substitute_extensions(session, [extension]) as sess:
+        assert session.flush.call_count == 1
+        assert session.close.call_count == 1
+        assert sess == session
+        assert sess._kwargs['extensions'] == [extension]
+    assert session.flush.call_count == 2
+    assert session.close.call_count == 2
+    assert session._kwargs['extensions'] == []
+
+
+class TestBatchIndexer(TestCase):
+    def setUp(self):
+        session = mock.Mock()
+        self.extcls = BatchIndexer
+        self.ext = self.extcls(session)
+
+    def _mock_indexable(self, **kw):
+        m = mock.Mock(**kw)
+        m.index_id.return_value = id(m)
+        return m
+
+    @mock.patch('allura.model.ArtifactReference.query.find')
+    def test_update_index(self, find):
+        m = self._mock_indexable
+        objs_deleted = [m(_id=i) for i in (1, 2, 3)]
+        arefs = [m(_id=i) for i in (4, 5, 6)]
+        find.return_value = [m(_id=i) for i in (7, 8, 9)]
+        self.ext.update_index(objs_deleted, arefs)
+        self.assertEqual(self.ext.to_delete,
+                         set([o.index_id() for o in objs_deleted]))
+        self.assertEqual(self.ext.to_add, set([4, 5, 6]))
+
+        # test deleting something that was previously added
+        objs_deleted += [m(_id=4)]
+        find.return_value = [m(_id=4)]
+        self.ext.update_index(objs_deleted, [])
+        self.assertEqual(self.ext.to_delete,
+                         set([o.index_id() for o in objs_deleted]))
+        self.assertEqual(self.ext.to_add, set([5, 6]))
+
+    @mock.patch('allura.model.session.index_tasks')
+    def test_flush(self, index_tasks):
+        objs_deleted = [self._mock_indexable(_id=i) for i in (1, 2, 3)]
+        del_index_ids = set([o.index_id() for o in objs_deleted])
+        self.extcls.to_delete = del_index_ids
+        self.extcls.to_add = set([4, 5, 6])
+        self.ext.flush()
+        index_tasks.del_artifacts.post.assert_called_once_with(list(del_index_ids))
+        index_tasks.add_artifacts.post.assert_called_once_with([4, 5, 6])
+        self.assertEqual(self.ext.to_delete, set())
+        self.assertEqual(self.ext.to_add, set())
+
+    @mock.patch('allura.model.session.index_tasks')
+    def test_flush_chunks_huge_lists(self, index_tasks):
+        self.extcls.to_delete = set(range(100 * 1000 + 1))
+        self.extcls.to_add = set(range(1000 * 1000 + 1))
+        self.ext.flush()
+        self.assertEqual(
+            len(index_tasks.del_artifacts.post.call_args_list[0][0][0]),
+            100 * 1000)
+        self.assertEqual(
+            len(index_tasks.del_artifacts.post.call_args_list[1][0][0]), 1)
+        self.assertEqual(
+            len(index_tasks.add_artifacts.post.call_args_list[0][0][0]),
+            1000 * 1000)
+        self.assertEqual(
+            len(index_tasks.add_artifacts.post.call_args_list[1][0][0]), 1)
+        self.assertEqual(self.ext.to_delete, set())
+        self.assertEqual(self.ext.to_add, set())
+
+    @mock.patch('allura.tasks.index_tasks')
+    def test_flush_noop(self, index_tasks):
+        self.ext.flush()
+        self.assertEqual(0, index_tasks.del_artifacts.post.call_count)
+        self.assertEqual(0, index_tasks.add_artifacts.post.call_count)
+        self.assertEqual(self.ext.to_delete, set())
+        self.assertEqual(self.ext.to_add, set())
+
+    @mock.patch('allura.tasks.index_tasks')
+    def test__post_too_large(self, index_tasks):
+        def on_post(chunk):
+            if len(chunk) > 1:
+                raise pymongo.errors.InvalidDocument(
+                        "BSON document too large (16906035 bytes) - the connected server supports BSON document sizes up to 16777216 bytes.")
+        index_tasks.add_artifacts.post.side_effect = on_post
+        self.ext._post(index_tasks.add_artifacts, range(5))
+        expected = [
+            mock.call([0, 1, 2, 3, 4]),
+            mock.call([0, 1]),
+            mock.call([0]),
+            mock.call([1]),
+            mock.call([2, 3, 4]),
+            mock.call([2]),
+            mock.call([3, 4]),
+            mock.call([3]),
+            mock.call([4])
+        ]
+        self.assertEqual(expected, index_tasks.add_artifacts.post.call_args_list)
+
+    @mock.patch('allura.tasks.index_tasks')
+    def test__post_other_error(self, index_tasks):
+        def on_post(chunk):
+            raise pymongo.errors.InvalidDocument("Cannot encode object...")
+        index_tasks.add_artifacts.post.side_effect = on_post
+        with td.raises(pymongo.errors.InvalidDocument):
+            self.ext._post(index_tasks.add_artifacts, range(5))

http://git-wip-us.apache.org/repos/asf/incubator-allura/blob/6897c237/ForgeImporters/forgeimporters/base.py
----------------------------------------------------------------------
diff --git a/ForgeImporters/forgeimporters/base.py b/ForgeImporters/forgeimporters/base.py
index 4f05156..412ff3d 100644
--- a/ForgeImporters/forgeimporters/base.py
+++ b/ForgeImporters/forgeimporters/base.py
@@ -116,9 +116,11 @@ def object_from_path(path):
 @task(notifications_disabled=True)
 def import_tool(importer_path, project_name=None, mount_point=None, mount_label=None, **kw):
     importer = object_from_path(importer_path)()
-    with ImportErrorHandler(importer, project_name, c.project) as handler:
+    with ImportErrorHandler(importer, project_name, c.project) as handler,\
+         M.session.substitute_extensions(M.artifact_orm_session, [M.session.BatchIndexer]):
         app = importer.import_tool(c.project, c.user, project_name=project_name,
                 mount_point=mount_point, mount_label=mount_label, **kw)
+        M.session.BatchIndexer.flush()
         if app:
             handler.success(app)