You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@allura.apache.org by br...@apache.org on 2013/11/19 21:33:06 UTC
git commit: [#6825] ticket:475 moved BatchIndexer from forge-classic
to allura
Updated Branches:
refs/heads/master 88c1fa16e -> 6897c237f
[#6825] ticket:475 moved BatchIndexer from forge-classic to allura
Project: http://git-wip-us.apache.org/repos/asf/incubator-allura/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-allura/commit/6897c237
Tree: http://git-wip-us.apache.org/repos/asf/incubator-allura/tree/6897c237
Diff: http://git-wip-us.apache.org/repos/asf/incubator-allura/diff/6897c237
Branch: refs/heads/master
Commit: 6897c237f7c5ff9012d6c9b4c9ebbafec715b5ae
Parents: 88c1fa1
Author: Yuriy Arhipov <yu...@yandex.ru>
Authored: Mon Nov 11 00:02:41 2013 +0400
Committer: Dave Brondsema <db...@slashdotmedia.com>
Committed: Tue Nov 19 20:32:34 2013 +0000
----------------------------------------------------------------------
Allura/allura/model/session.py | 99 +++++++++++++++++++
Allura/allura/tests/unit/test_session.py | 135 ++++++++++++++++++++++++++
ForgeImporters/forgeimporters/base.py | 4 +-
3 files changed, 237 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-allura/blob/6897c237/Allura/allura/model/session.py
----------------------------------------------------------------------
diff --git a/Allura/allura/model/session.py b/Allura/allura/model/session.py
index ff06b00..9a2c061 100644
--- a/Allura/allura/model/session.py
+++ b/Allura/allura/model/session.py
@@ -16,10 +16,15 @@
# under the License.
import logging
+import pymongo
from ming import Session
from ming.orm.base import state
from ming.orm.ormsession import ThreadLocalORMSession, SessionExtension
+from contextlib import contextmanager
+
+from allura.lib.utils import chunked_list
+from allura.tasks import index_tasks
log = logging.getLogger(__name__)
@@ -83,6 +88,100 @@ class ArtifactSessionExtension(SessionExtension):
if arefs:
index_tasks.add_artifacts.post([aref._id for aref in arefs])
+class BatchIndexer(ArtifactSessionExtension):
+ """
+ Tracks needed search index operations over the life of a
+ :class:`ming.odm.session.ThreadLocalODMSession` session, and performs them
+ in a batch when :meth:`flush` is called.
+ """
+ to_delete = set()
+ to_add = set()
+
+ def __init__(self, session):
+ ArtifactSessionExtension.__init__(self, session)
+
+ def update_index(self, objects_deleted, arefs_added):
+ """
+ Caches adds and deletes for handling later. Called after each flush of
+ the parent session.
+
+ :param objects_deleted: :class:`allura.model.artifact.Artifact`
+ instances that were deleted in the flush.
+
+ :param arefs_added: :class:`allura.model.artifact.ArtifactReference`
+ instances for all ``Artifact`` instances that were added or modified
+ in the flush.
+ """
+
+ from .index import ArtifactReference
+ del_index_ids = [obj.index_id() for obj in objects_deleted]
+ deleted_aref_ids = [aref._id for aref in
+ ArtifactReference.query.find(dict(_id={'$in': del_index_ids}))]
+ cls = self.__class__
+ cls.to_add -= set(deleted_aref_ids)
+ cls.to_delete |= set(del_index_ids)
+ cls.to_add |= set([aref._id for aref in arefs_added])
+
+ @classmethod
+ def flush(cls):
+ """
+ Creates indexing tasks for cached adds and deletes, and resets the
+ caches.
+
+ .. warning:: This method is NOT called automatically when the parent
+ session is flushed. It MUST be called explicitly.
+ """
+ # Post in chunks to avoid overflowing the max BSON document
+ # size when the Monq task is created:
+ # cls.to_delete - contains solr index ids which can easily be over
+ # 100 bytes. Here we allow for 160 bytes avg, plus
+ # room for other document overhead.
+ # cls.to_add - contains BSON ObjectIds, which are 12 bytes each, so
+ # we can easily put 1m in a doc with room left over.
+ if cls.to_delete:
+ for chunk in chunked_list(list(cls.to_delete), 100 * 1000):
+ cls._post(index_tasks.del_artifacts, chunk)
+
+ if cls.to_add:
+ for chunk in chunked_list(list(cls.to_add), 1000 * 1000):
+ cls._post(index_tasks.add_artifacts, chunk)
+ cls.to_delete = set()
+ cls.to_add = set()
+
+ @classmethod
+ def _post(cls, task_func, chunk):
+ """
+ Post task, recursively splitting and re-posting if the resulting
+ mongo document is too large.
+ """
+ try:
+ task_func.post(chunk)
+ except pymongo.errors.InvalidDocument as e:
+ # there are many types of InvalidDocument, only recurse if its expected to help
+ if str(e).startswith('BSON document too large'):
+ cls._post(task_func, chunk[:len(chunk) // 2])
+ cls._post(task_func, chunk[len(chunk) // 2:])
+ else:
+ raise
+
+
+@contextmanager
+def substitute_extensions(session, extensions=None):
+ """
+ Temporarily replace the extensions on a
+ :class:`ming.odm.session.ThreadLocalODMSession` session.
+ """
+ original_exts = session._kwargs.get('extensions', [])
+ def _set_exts(exts):
+ session.flush()
+ session.close()
+ session._kwargs['extensions'] = exts
+ _set_exts(extensions or [])
+ yield session
+ _set_exts(original_exts)
+
+
+
main_doc_session = Session.by_name('main')
project_doc_session = Session.by_name('project')
task_doc_session = Session.by_name('task')
http://git-wip-us.apache.org/repos/asf/incubator-allura/blob/6897c237/Allura/allura/tests/unit/test_session.py
----------------------------------------------------------------------
diff --git a/Allura/allura/tests/unit/test_session.py b/Allura/allura/tests/unit/test_session.py
new file mode 100644
index 0000000..5b5449e
--- /dev/null
+++ b/Allura/allura/tests/unit/test_session.py
@@ -0,0 +1,135 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pymongo
+import mock
+
+from unittest import TestCase
+
+from allura.tests import decorators as td
+from allura.model.session import BatchIndexer, substitute_extensions
+
+
+def test_extensions_cm():
+ session = mock.Mock(_kwargs=dict(extensions=[]))
+ extension = mock.Mock()
+ with substitute_extensions(session, [extension]) as sess:
+ assert session.flush.call_count == 1
+ assert session.close.call_count == 1
+ assert sess == session
+ assert sess._kwargs['extensions'] == [extension]
+ assert session.flush.call_count == 2
+ assert session.close.call_count == 2
+ assert session._kwargs['extensions'] == []
+
+
+class TestBatchIndexer(TestCase):
+ def setUp(self):
+ session = mock.Mock()
+ self.extcls = BatchIndexer
+ self.ext = self.extcls(session)
+
+ def _mock_indexable(self, **kw):
+ m = mock.Mock(**kw)
+ m.index_id.return_value = id(m)
+ return m
+
+ @mock.patch('allura.model.ArtifactReference.query.find')
+ def test_update_index(self, find):
+ m = self._mock_indexable
+ objs_deleted = [m(_id=i) for i in (1, 2, 3)]
+ arefs = [m(_id=i) for i in (4, 5, 6)]
+ find.return_value = [m(_id=i) for i in (7, 8, 9)]
+ self.ext.update_index(objs_deleted, arefs)
+ self.assertEqual(self.ext.to_delete,
+ set([o.index_id() for o in objs_deleted]))
+ self.assertEqual(self.ext.to_add, set([4, 5, 6]))
+
+ # test deleting something that was previously added
+ objs_deleted += [m(_id=4)]
+ find.return_value = [m(_id=4)]
+ self.ext.update_index(objs_deleted, [])
+ self.assertEqual(self.ext.to_delete,
+ set([o.index_id() for o in objs_deleted]))
+ self.assertEqual(self.ext.to_add, set([5, 6]))
+
+ @mock.patch('allura.model.session.index_tasks')
+ def test_flush(self, index_tasks):
+ objs_deleted = [self._mock_indexable(_id=i) for i in (1, 2, 3)]
+ del_index_ids = set([o.index_id() for o in objs_deleted])
+ self.extcls.to_delete = del_index_ids
+ self.extcls.to_add = set([4, 5, 6])
+ self.ext.flush()
+ index_tasks.del_artifacts.post.assert_called_once_with(list(del_index_ids))
+ index_tasks.add_artifacts.post.assert_called_once_with([4, 5, 6])
+ self.assertEqual(self.ext.to_delete, set())
+ self.assertEqual(self.ext.to_add, set())
+
+ @mock.patch('allura.model.session.index_tasks')
+ def test_flush_chunks_huge_lists(self, index_tasks):
+ self.extcls.to_delete = set(range(100 * 1000 + 1))
+ self.extcls.to_add = set(range(1000 * 1000 + 1))
+ self.ext.flush()
+ self.assertEqual(
+ len(index_tasks.del_artifacts.post.call_args_list[0][0][0]),
+ 100 * 1000)
+ self.assertEqual(
+ len(index_tasks.del_artifacts.post.call_args_list[1][0][0]), 1)
+ self.assertEqual(
+ len(index_tasks.add_artifacts.post.call_args_list[0][0][0]),
+ 1000 * 1000)
+ self.assertEqual(
+ len(index_tasks.add_artifacts.post.call_args_list[1][0][0]), 1)
+ self.assertEqual(self.ext.to_delete, set())
+ self.assertEqual(self.ext.to_add, set())
+
+ @mock.patch('allura.tasks.index_tasks')
+ def test_flush_noop(self, index_tasks):
+ self.ext.flush()
+ self.assertEqual(0, index_tasks.del_artifacts.post.call_count)
+ self.assertEqual(0, index_tasks.add_artifacts.post.call_count)
+ self.assertEqual(self.ext.to_delete, set())
+ self.assertEqual(self.ext.to_add, set())
+
+ @mock.patch('allura.tasks.index_tasks')
+ def test__post_too_large(self, index_tasks):
+ def on_post(chunk):
+ if len(chunk) > 1:
+ raise pymongo.errors.InvalidDocument(
+ "BSON document too large (16906035 bytes) - the connected server supports BSON document sizes up to 16777216 bytes.")
+ index_tasks.add_artifacts.post.side_effect = on_post
+ self.ext._post(index_tasks.add_artifacts, range(5))
+ expected = [
+ mock.call([0, 1, 2, 3, 4]),
+ mock.call([0, 1]),
+ mock.call([0]),
+ mock.call([1]),
+ mock.call([2, 3, 4]),
+ mock.call([2]),
+ mock.call([3, 4]),
+ mock.call([3]),
+ mock.call([4])
+ ]
+ self.assertEqual(expected, index_tasks.add_artifacts.post.call_args_list)
+
+ @mock.patch('allura.tasks.index_tasks')
+ def test__post_other_error(self, index_tasks):
+ def on_post(chunk):
+ raise pymongo.errors.InvalidDocument("Cannot encode object...")
+ index_tasks.add_artifacts.post.side_effect = on_post
+ with td.raises(pymongo.errors.InvalidDocument):
+ self.ext._post(index_tasks.add_artifacts, range(5))
http://git-wip-us.apache.org/repos/asf/incubator-allura/blob/6897c237/ForgeImporters/forgeimporters/base.py
----------------------------------------------------------------------
diff --git a/ForgeImporters/forgeimporters/base.py b/ForgeImporters/forgeimporters/base.py
index 4f05156..412ff3d 100644
--- a/ForgeImporters/forgeimporters/base.py
+++ b/ForgeImporters/forgeimporters/base.py
@@ -116,9 +116,11 @@ def object_from_path(path):
@task(notifications_disabled=True)
def import_tool(importer_path, project_name=None, mount_point=None, mount_label=None, **kw):
importer = object_from_path(importer_path)()
- with ImportErrorHandler(importer, project_name, c.project) as handler:
+ with ImportErrorHandler(importer, project_name, c.project) as handler,\
+ M.session.substitute_extensions(M.artifact_orm_session, [M.session.BatchIndexer]):
app = importer.import_tool(c.project, c.user, project_name=project_name,
mount_point=mount_point, mount_label=mount_label, **kw)
+ M.session.BatchIndexer.flush()
if app:
handler.success(app)