You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@allura.apache.org by di...@apache.org on 2022/09/30 13:34:13 UTC
[allura] 10/10: [#8455] remove @with_setup
This is an automated email from the ASF dual-hosted git repository.
dill0wn pushed a commit to branch dw/8455-part2
in repository https://gitbox.apache.org/repos/asf/allura.git
commit 50aa084c873c6f74d5b159f19f5369f12af7d63f
Author: Dillon Walls <di...@slashdotmedia.com>
AuthorDate: Thu Sep 29 14:04:06 2022 +0000
[#8455] remove @with_setup
---
Allura/allura/tests/model/test_artifact.py | 493 ++++++-------
Allura/allura/tests/model/test_auth.py | 823 ++++++++++-----------
Allura/allura/tests/model/test_discussion.py | 966 ++++++++++++-------------
Allura/allura/tests/model/test_monq.py | 4 +-
Allura/allura/tests/model/test_neighborhood.py | 107 ++-
Allura/allura/tests/model/test_oauth.py | 32 +-
Allura/allura/tests/model/test_project.py | 325 ++++-----
Allura/allura/tests/test_app.py | 295 ++++----
8 files changed, 1449 insertions(+), 1596 deletions(-)
diff --git a/Allura/allura/tests/model/test_artifact.py b/Allura/allura/tests/model/test_artifact.py
index 940ae0cac..35402dd0c 100644
--- a/Allura/allura/tests/model/test_artifact.py
+++ b/Allura/allura/tests/model/test_artifact.py
@@ -22,7 +22,6 @@ import re
from datetime import datetime
from tg import tmpl_context as c
-from alluratest.tools import with_setup
from mock import patch
import pytest
from ming.orm.ormsession import ThreadLocalORMSession
@@ -55,273 +54,239 @@ class Checkmessage(M.Message):
Mapper.compile_all()
-def setup_method():
- setup_basic_test()
- setup_unit_test()
- setup_with_tools()
-
-
-def teardown_module():
- ThreadLocalORMSession.close_all()
-
-
-@td.with_wiki
-def setup_with_tools():
- h.set_context('test', 'wiki', neighborhood='Projects')
- Checkmessage.query.remove({})
- WM.Page.query.remove({})
- WM.PageHistory.query.remove({})
- M.Shortlink.query.remove({})
- c.user = M.User.query.get(username='test-admin')
- Checkmessage.project = c.project
- Checkmessage.app_config = c.app.config
-
-
-@with_setup(setup_method)
-def test_artifact():
- pg = WM.Page(title='TestPage1')
- assert pg.project == c.project
- assert pg.project_id == c.project._id
- assert pg.app.config == c.app.config
- assert pg.app_config == c.app.config
- u = M.User.query.get(username='test-user')
- pr = M.ProjectRole.by_user(u, upsert=True)
- ThreadLocalORMSession.flush_all()
- REGISTRY.register(allura.credentials, allura.lib.security.Credentials())
- assert not security.has_access(pg, 'delete')(user=u)
- pg.acl.append(M.ACE.allow(pr._id, 'delete'))
- ThreadLocalORMSession.flush_all()
- assert security.has_access(pg, 'delete')(user=u)
- pg.acl.pop()
- ThreadLocalORMSession.flush_all()
- assert not security.has_access(pg, 'delete')(user=u)
-
-
-def test_artifact_index():
- pg = WM.Page(title='TestPage1')
- idx = pg.index()
- assert 'title' in idx
- assert 'url_s' in idx
- assert 'project_id_s' in idx
- assert 'mount_point_s' in idx
- assert 'type_s' in idx
- assert 'id' in idx
- assert idx['id'] == pg.index_id()
- assert 'text' in idx
- assert 'TestPage' in pg.shorthand_id()
- assert pg.link_text() == pg.shorthand_id()
-
-
-@with_setup(setup_method)
-def test_artifactlink():
- pg = WM.Page(title='TestPage2')
- q_shortlink = M.Shortlink.query.find(dict(
- project_id=c.project._id,
- app_config_id=c.app.config._id,
- link=pg.shorthand_id()))
- assert q_shortlink.count() == 0
-
- ThreadLocalORMSession.flush_all()
- M.MonQTask.run_ready()
- ThreadLocalORMSession.flush_all()
- assert q_shortlink.count() == 1
-
- assert M.Shortlink.lookup('[TestPage2]')
- assert M.Shortlink.lookup('[wiki:TestPage2]')
- assert M.Shortlink.lookup('[test:wiki:TestPage2]')
- assert not M.Shortlink.lookup('[test:wiki:TestPage2:foo]')
- assert not M.Shortlink.lookup('[Wiki:TestPage2]')
- assert not M.Shortlink.lookup('[TestPage2_no_such_page]')
-
- pg.delete()
- c.project.uninstall_app('wiki')
- assert not M.Shortlink.lookup('[wiki:TestPage2]')
- assert q_shortlink.count() == 0
-
-
-@with_setup(setup_method)
-def test_gen_messageid():
- assert re.match(r'[0-9a-zA-Z]*.wiki@test.p.localhost',
- h.gen_message_id())
-
-
-@with_setup(setup_method)
-def test_gen_messageid_with_id_set():
- oid = ObjectId()
- assert re.match(r'%s.wiki@test.p.localhost' %
- str(oid), h.gen_message_id(oid))
-
-
-@with_setup(setup_method)
-def test_artifact_messageid():
- p = WM.Page(title='T')
- assert re.match(r'%s.wiki@test.p.localhost' %
- str(p._id), p.message_id())
-
-
-@with_setup(setup_method)
-def test_versioning():
- pg = WM.Page(title='TestPage3')
- with patch('allura.model.artifact.request',
- Request.blank('/', remote_addr='1.1.1.1')):
+class TestArtifact:
+
+ def setup_method(self):
+ setup_basic_test()
+ setup_unit_test()
+ self.setup_with_tools()
+
+ def teardown_class(cls):
+ ThreadLocalORMSession.close_all()
+
+ @td.with_wiki
+ def setup_with_tools(self):
+ h.set_context('test', 'wiki', neighborhood='Projects')
+ Checkmessage.query.remove({})
+ WM.Page.query.remove({})
+ WM.PageHistory.query.remove({})
+ M.Shortlink.query.remove({})
+ c.user = M.User.query.get(username='test-admin')
+ Checkmessage.project = c.project
+ Checkmessage.app_config = c.app.config
+
+ def test_artifact(self):
+ pg = WM.Page(title='TestPage1')
+ assert pg.project == c.project
+ assert pg.project_id == c.project._id
+ assert pg.app.config == c.app.config
+ assert pg.app_config == c.app.config
+ u = M.User.query.get(username='test-user')
+ pr = M.ProjectRole.by_user(u, upsert=True)
+ ThreadLocalORMSession.flush_all()
+ REGISTRY.register(allura.credentials, allura.lib.security.Credentials())
+ assert not security.has_access(pg, 'delete')(user=u)
+ pg.acl.append(M.ACE.allow(pr._id, 'delete'))
+ ThreadLocalORMSession.flush_all()
+ assert security.has_access(pg, 'delete')(user=u)
+ pg.acl.pop()
+ ThreadLocalORMSession.flush_all()
+ assert not security.has_access(pg, 'delete')(user=u)
+
+ def test_artifact_index(self):
+ pg = WM.Page(title='TestPage1')
+ idx = pg.index()
+ assert 'title' in idx
+ assert 'url_s' in idx
+ assert 'project_id_s' in idx
+ assert 'mount_point_s' in idx
+ assert 'type_s' in idx
+ assert 'id' in idx
+ assert idx['id'] == pg.index_id()
+ assert 'text' in idx
+ assert 'TestPage' in pg.shorthand_id()
+ assert pg.link_text() == pg.shorthand_id()
+
+ def test_artifactlink(self):
+ pg = WM.Page(title='TestPage2')
+ q_shortlink = M.Shortlink.query.find(dict(
+ project_id=c.project._id,
+ app_config_id=c.app.config._id,
+ link=pg.shorthand_id()))
+ assert q_shortlink.count() == 0
+
+ ThreadLocalORMSession.flush_all()
+ M.MonQTask.run_ready()
+ ThreadLocalORMSession.flush_all()
+ assert q_shortlink.count() == 1
+
+ assert M.Shortlink.lookup('[TestPage2]')
+ assert M.Shortlink.lookup('[wiki:TestPage2]')
+ assert M.Shortlink.lookup('[test:wiki:TestPage2]')
+ assert not M.Shortlink.lookup('[test:wiki:TestPage2:foo]')
+ assert not M.Shortlink.lookup('[Wiki:TestPage2]')
+ assert not M.Shortlink.lookup('[TestPage2_no_such_page]')
+
+ pg.delete()
+ c.project.uninstall_app('wiki')
+ assert not M.Shortlink.lookup('[wiki:TestPage2]')
+ assert q_shortlink.count() == 0
+
+ def test_gen_messageid(self):
+ assert re.match(r'[0-9a-zA-Z]*.wiki@test.p.localhost',
+ h.gen_message_id())
+
+ def test_gen_messageid_with_id_set(self):
+ oid = ObjectId()
+ assert re.match(r'%s.wiki@test.p.localhost' %
+ str(oid), h.gen_message_id(oid))
+
+ def test_artifact_messageid(self):
+ p = WM.Page(title='T')
+ assert re.match(r'%s.wiki@test.p.localhost' %
+ str(p._id), p.message_id())
+
+ def test_versioning(self):
+ pg = WM.Page(title='TestPage3')
+ with patch('allura.model.artifact.request', Request.blank('/', remote_addr='1.1.1.1')):
+ pg.commit()
+ ThreadLocalORMSession.flush_all()
+ pg.text = 'Here is some text'
pg.commit()
- ThreadLocalORMSession.flush_all()
- pg.text = 'Here is some text'
- pg.commit()
- ThreadLocalORMSession.flush_all()
- ss = pg.get_version(1)
- assert ss.author.logged_ip == '1.1.1.1'
- assert ss.index()['is_history_b']
- assert ss.shorthand_id() == pg.shorthand_id() + '#1'
- assert ss.title == pg.title
- assert ss.text != pg.text
- ss = pg.get_version(-1)
- assert ss.index()['is_history_b']
- assert ss.shorthand_id() == pg.shorthand_id() + '#2'
- assert ss.title == pg.title
- assert ss.text == pg.text
- pytest.raises(IndexError, pg.get_version, 42)
- pg.revert(1)
- pg.commit()
- ThreadLocalORMSession.flush_all()
- assert ss.text != pg.text
- assert pg.history().count() == 3
-
-
-@with_setup(setup_method)
-def test_messages_unknown_lookup():
- from bson import ObjectId
- m = Checkmessage()
- m.author_id = ObjectId() # something new
- assert isinstance(m.author(), M.User), type(m.author())
- assert m.author() == M.User.anonymous()
-
-
-@with_setup(setup_method)
-@patch('allura.model.artifact.datetime')
-def test_last_updated(_datetime):
- c.project.last_updated = datetime(2014, 1, 1)
- _datetime.utcnow.return_value = datetime(2014, 1, 2)
- WM.Page(title='TestPage1')
- ThreadLocalORMSession.flush_all()
- assert c.project.last_updated == datetime(2014, 1, 2)
-
-
-@with_setup(setup_method)
-@patch('allura.model.artifact.datetime')
-def test_last_updated_disabled(_datetime):
- c.project.last_updated = datetime(2014, 1, 1)
- _datetime.utcnow.return_value = datetime(2014, 1, 2)
- try:
- M.artifact_orm_session._get().skip_last_updated = True
+ ThreadLocalORMSession.flush_all()
+ ss = pg.get_version(1)
+ assert ss.author.logged_ip == '1.1.1.1'
+ assert ss.index()['is_history_b']
+ assert ss.shorthand_id() == pg.shorthand_id() + '#1'
+ assert ss.title == pg.title
+ assert ss.text != pg.text
+ ss = pg.get_version(-1)
+ assert ss.index()['is_history_b']
+ assert ss.shorthand_id() == pg.shorthand_id() + '#2'
+ assert ss.title == pg.title
+ assert ss.text == pg.text
+ pytest.raises(IndexError, pg.get_version, 42)
+ pg.revert(1)
+ pg.commit()
+ ThreadLocalORMSession.flush_all()
+ assert ss.text != pg.text
+ assert pg.history().count() == 3
+
+ def test_messages_unknown_lookup(self):
+ from bson import ObjectId
+ m = Checkmessage()
+ m.author_id = ObjectId() # something new
+ assert isinstance(m.author(), M.User), type(m.author())
+ assert m.author() == M.User.anonymous()
+
+ @patch('allura.model.artifact.datetime')
+ def test_last_updated(self, _datetime):
+ c.project.last_updated = datetime(2014, 1, 1)
+ _datetime.utcnow.return_value = datetime(2014, 1, 2)
WM.Page(title='TestPage1')
ThreadLocalORMSession.flush_all()
- assert c.project.last_updated == datetime(2014, 1, 1)
- finally:
- M.artifact_orm_session._get().skip_last_updated = False
-
-
-@with_setup(setup_method)
-def test_get_discussion_thread_dupe():
- artif = WM.Page(title='TestSomeArtifact')
- thr1 = artif.get_discussion_thread()[0]
- thr1.post('thr1-post1')
- thr1.post('thr1-post2')
- thr2 = M.Thread.new(ref_id=thr1.ref_id)
- thr2.post('thr2-post1')
- thr2.post('thr2-post2')
- thr2.post('thr2-post3')
- thr3 = M.Thread.new(ref_id=thr1.ref_id)
- thr3.post('thr3-post1')
- thr4 = M.Thread.new(ref_id=thr1.ref_id)
-
- thread_q = M.Thread.query.find(dict(ref_id=artif.index_id()))
- assert thread_q.count() == 4
-
- thread = artif.get_discussion_thread()[0] # force cleanup
- threads = thread_q.all()
- assert len(threads) == 1
- assert len(thread.posts) == 6
- assert not any(p.deleted for p in thread.posts) # normal thread deletion propagates to children, make sure that doesn't happen
-
-
-def test_snapshot_clear_user_data():
- s = M.Snapshot(author={'username': 'johndoe',
- 'display_name': 'John Doe',
- 'logged_ip': '1.2.3.4'})
- s.clear_user_data()
- assert s.author == {'username': '',
+ assert c.project.last_updated == datetime(2014, 1, 2)
+
+ @patch('allura.model.artifact.datetime')
+ def test_last_updated_disabled(self, _datetime):
+ c.project.last_updated = datetime(2014, 1, 1)
+ _datetime.utcnow.return_value = datetime(2014, 1, 2)
+ try:
+ M.artifact_orm_session._get().skip_last_updated = True
+ WM.Page(title='TestPage1')
+ ThreadLocalORMSession.flush_all()
+ assert c.project.last_updated == datetime(2014, 1, 1)
+ finally:
+ M.artifact_orm_session._get().skip_last_updated = False
+
+ def test_get_discussion_thread_dupe(self):
+ artif = WM.Page(title='TestSomeArtifact')
+ thr1 = artif.get_discussion_thread()[0]
+ thr1.post('thr1-post1')
+ thr1.post('thr1-post2')
+ thr2 = M.Thread.new(ref_id=thr1.ref_id)
+ thr2.post('thr2-post1')
+ thr2.post('thr2-post2')
+ thr2.post('thr2-post3')
+ thr3 = M.Thread.new(ref_id=thr1.ref_id)
+ thr3.post('thr3-post1')
+ thr4 = M.Thread.new(ref_id=thr1.ref_id)
+
+ thread_q = M.Thread.query.find(dict(ref_id=artif.index_id()))
+ assert thread_q.count() == 4
+
+ thread = artif.get_discussion_thread()[0] # force cleanup
+ threads = thread_q.all()
+ assert len(threads) == 1
+ assert len(thread.posts) == 6
+ # normal thread deletion propagates to children, make sure that doesn't happen
+ assert not any(p.deleted for p in thread.posts)
+
+ def test_snapshot_clear_user_data(self):
+ s = M.Snapshot(author={'username': 'johndoe',
+ 'display_name': 'John Doe',
+ 'logged_ip': '1.2.3.4'})
+ s.clear_user_data()
+ assert s.author == {'username': '',
'display_name': '',
'logged_ip': None,
- 'id': None,
- }
-
-
-@with_setup(setup_method)
-def test_snapshot_from_username():
- s = M.Snapshot(author={'username': 'johndoe',
- 'display_name': 'John Doe',
- 'logged_ip': '1.2.3.4'})
- s = M.Snapshot(author={'username': 'johnsmith',
- 'display_name': 'John Doe',
- 'logged_ip': '1.2.3.4'})
- ThreadLocalORMSession.flush_all()
- assert len(M.Snapshot.from_username('johndoe')) == 1
-
-
-def test_feed_clear_user_data():
- f = M.Feed(author_name='John Doe',
+ 'id': None}
+
+ def test_snapshot_from_username(self):
+ s = M.Snapshot(author={'username': 'johndoe',
+ 'display_name': 'John Doe',
+ 'logged_ip': '1.2.3.4'})
+ s = M.Snapshot(author={'username': 'johnsmith',
+ 'display_name': 'John Doe',
+ 'logged_ip': '1.2.3.4'})
+ ThreadLocalORMSession.flush_all()
+ assert len(M.Snapshot.from_username('johndoe')) == 1
+
+ def test_feed_clear_user_data(self):
+ f = M.Feed(author_name='John Doe',
+ author_link='/u/johndoe/',
+ title='Something')
+ f.clear_user_data()
+ assert f.author_name == ''
+ assert f.author_link == ''
+ assert f.title == 'Something'
+
+ f = M.Feed(author_name='John Doe',
+ author_link='/u/johndoe/',
+ title='Home Page modified by John Doe')
+ f.clear_user_data()
+ assert f.author_name == ''
+ assert f.author_link == ''
+ assert f.title == 'Home Page modified by <REDACTED>'
+
+ def test_feed_from_username(self):
+ M.Feed(author_name='John Doe',
author_link='/u/johndoe/',
title='Something')
- f.clear_user_data()
- assert f.author_name == ''
- assert f.author_link == ''
- assert f.title == 'Something'
-
- f = M.Feed(author_name='John Doe',
- author_link='/u/johndoe/',
- title='Home Page modified by John Doe')
- f.clear_user_data()
- assert f.author_name == ''
- assert f.author_link == ''
- assert f.title == 'Home Page modified by <REDACTED>'
-
-
-@with_setup(setup_method)
-def test_feed_from_username():
- M.Feed(author_name='John Doe',
- author_link='/u/johndoe/',
- title='Something')
- M.Feed(author_name='John Smith',
- author_link='/u/johnsmith/',
- title='Something')
- ThreadLocalORMSession.flush_all()
- assert len(M.Feed.from_username('johndoe')) == 1
-
-
-@with_setup(setup_method)
-def test_subscribed():
- pg = WM.Page(title='TestPage4a')
- assert pg.subscribed(include_parents=True) # tool is subscribed to admins by default
- assert not pg.subscribed(include_parents=False)
-
-
-@with_setup(setup_method)
-def test_subscribed_no_tool_sub():
- pg = WM.Page(title='TestPage4b')
- M.Mailbox.unsubscribe(user_id=c.user._id,
- project_id=c.project._id,
- app_config_id=c.app.config._id)
- pg.subscribe()
- assert pg.subscribed(include_parents=True)
- assert pg.subscribed(include_parents=False)
-
-
-@with_setup(setup_method)
-def test_not_subscribed():
- pg = WM.Page(title='TestPage4c')
- M.Mailbox.unsubscribe(user_id=c.user._id,
- project_id=c.project._id,
- app_config_id=c.app.config._id)
- assert not pg.subscribed(include_parents=True)
- assert not pg.subscribed(include_parents=False)
+ M.Feed(author_name='John Smith',
+ author_link='/u/johnsmith/',
+ title='Something')
+ ThreadLocalORMSession.flush_all()
+ assert len(M.Feed.from_username('johndoe')) == 1
+
+ def test_subscribed(self):
+ pg = WM.Page(title='TestPage4a')
+ assert pg.subscribed(include_parents=True) # tool is subscribed to admins by default
+ assert not pg.subscribed(include_parents=False)
+
+ def test_subscribed_no_tool_sub(self):
+ pg = WM.Page(title='TestPage4b')
+ M.Mailbox.unsubscribe(user_id=c.user._id,
+ project_id=c.project._id,
+ app_config_id=c.app.config._id)
+ pg.subscribe()
+ assert pg.subscribed(include_parents=True)
+ assert pg.subscribed(include_parents=False)
+
+ def test_not_subscribed(self):
+ pg = WM.Page(title='TestPage4c')
+ M.Mailbox.unsubscribe(user_id=c.user._id,
+ project_id=c.project._id,
+ app_config_id=c.app.config._id)
+ assert not pg.subscribed(include_parents=True)
+ assert not pg.subscribed(include_parents=False)
diff --git a/Allura/allura/tests/model/test_auth.py b/Allura/allura/tests/model/test_auth.py
index b4e5e80de..7b6364938 100644
--- a/Allura/allura/tests/model/test_auth.py
+++ b/Allura/allura/tests/model/test_auth.py
@@ -22,15 +22,7 @@ Model tests for auth
import textwrap
from datetime import datetime, timedelta
-from alluratest.tools import (
- with_setup,
- assert_equal,
- assert_not_equal,
- assert_true,
- assert_not_in,
- assert_in,
-)
-from tg import tmpl_context as c, app_globals as g, request
+from tg import tmpl_context as c, app_globals as g, request as r
from webob import Request
from mock import patch, Mock
@@ -41,437 +33,404 @@ from allura import model as M
from allura.lib import helpers as h
from allura.lib import plugin
from allura.tests import decorators as td
-from alluratest.controller import setup_basic_test, setup_global_objects, setup_functional_test
+from alluratest.controller import setup_basic_test, setup_global_objects, setup_functional_test, setup_unit_test
from alluratest.pytest_helpers import with_nose_compatibility
-def setup_method():
- setup_basic_test()
- ThreadLocalORMSession.close_all()
- setup_global_objects()
-
-
-@with_setup(setup_method)
-def test_email_address():
- addr = M.EmailAddress(email='test_admin@domain.net',
- claimed_by_user_id=c.user._id)
- ThreadLocalORMSession.flush_all()
- assert addr.claimed_by_user() == c.user
- addr2 = M.EmailAddress.create('test@domain.net')
- addr3 = M.EmailAddress.create('test_admin@domain.net')
-
- # Duplicate emails are allowed, until the email is confirmed
- assert addr3 is not addr
-
- assert addr2 is not addr
- assert addr2
- addr4 = M.EmailAddress.create('test@DOMAIN.NET')
- assert addr4 is not addr2
-
- assert addr is c.user.address_object('test_admin@domain.net')
- c.user.claim_address('test@DOMAIN.NET')
- assert 'test@domain.net' in c.user.email_addresses
-
-
-@with_setup(setup_method)
-def test_email_address_lookup_helpers():
- addr = M.EmailAddress.create('TEST@DOMAIN.NET')
- nobody = M.EmailAddress.create('nobody@example.com')
- ThreadLocalORMSession.flush_all()
- assert addr.email == 'TEST@domain.net'
-
- assert M.EmailAddress.get(email='TEST@DOMAIN.NET') == addr
- assert M.EmailAddress.get(email='TEST@domain.net') == addr
- assert M.EmailAddress.get(email='test@domain.net') == None
- assert M.EmailAddress.get(email=None) == None
- assert M.EmailAddress.get(email='nobody@example.com') == nobody
- # invalid email returns None, but not nobody@example.com as before
- assert M.EmailAddress.get(email='invalid') == None
-
- assert M.EmailAddress.find(dict(email='TEST@DOMAIN.NET')).all() == [addr]
- assert M.EmailAddress.find(dict(email='TEST@domain.net')).all() == [addr]
- assert M.EmailAddress.find(dict(email='test@domain.net')).all() == []
- assert M.EmailAddress.find(dict(email=None)).all() == []
- assert M.EmailAddress.find(dict(email='nobody@example.com')).all() == [nobody]
- # invalid email returns empty query, but not nobody@example.com as before
- assert M.EmailAddress.find(dict(email='invalid')).all() == []
-
-
-@with_setup(setup_method)
-def test_email_address_canonical():
- assert (M.EmailAddress.canonical('nobody@EXAMPLE.COM') ==
- 'nobody@example.com')
- assert (M.EmailAddress.canonical('nobody@example.com') ==
- 'nobody@example.com')
- assert (M.EmailAddress.canonical('I Am Nobody <no...@example.com>') ==
- 'nobody@example.com')
- assert (M.EmailAddress.canonical(' nobody@example.com\t') ==
- 'nobody@example.com')
- assert (M.EmailAddress.canonical('I Am@Nobody <no...@example.com> ') ==
- 'nobody@example.com')
- assert (M.EmailAddress.canonical(' No@body <no...@example.com> ') ==
- 'no@body@example.com')
- assert (M.EmailAddress.canonical('no@body@example.com') ==
- 'no@body@example.com')
- assert M.EmailAddress.canonical('invalid') == None
-
-@with_setup(setup_method)
-def test_email_address_send_verification_link():
- addr = M.EmailAddress(email='test_admin@domain.net',
- claimed_by_user_id=c.user._id)
-
- addr.send_verification_link()
-
- with patch('allura.tasks.mail_tasks.smtp_client._client') as _client:
- M.MonQTask.run_ready()
- return_path, rcpts, body = _client.sendmail.call_args[0]
- assert rcpts == ['test_admin@domain.net']
-
-
-@with_setup(setup_method)
-@td.with_user_project('test-admin')
-def test_user():
- assert c.user.url() .endswith('/u/test-admin/')
- assert c.user.script_name .endswith('/u/test-admin/')
- assert ({p.shortname for p in c.user.my_projects()} ==
- {'test', 'test2', 'u/test-admin', 'adobe-1', '--init--'})
- # delete one of the projects and make sure it won't appear in my_projects()
- p = M.Project.query.get(shortname='test2')
- p.deleted = True
- ThreadLocalORMSession.flush_all()
- assert ({p.shortname for p in c.user.my_projects()} ==
- {'test', 'u/test-admin', 'adobe-1', '--init--'})
- u = M.User.register(dict(
- username='nosetest-user'))
- ThreadLocalORMSession.flush_all()
- assert u.reg_date
- assert u.private_project().shortname == 'u/nosetest-user'
- roles = g.credentials.user_roles(
- u._id, project_id=u.private_project().root_project._id)
- assert len(roles) == 3, roles
- u.set_password('foo')
- provider = plugin.LocalAuthenticationProvider(Request.blank('/'))
- assert provider._validate_password(u, 'foo')
- assert not provider._validate_password(u, 'foobar')
- u.set_password('foobar')
- assert provider._validate_password(u, 'foobar')
- assert not provider._validate_password(u, 'foo')
-
-
-@with_setup(setup_method)
-def test_user_project_creates_on_demand():
- u = M.User.register(dict(username='foobar123'), make_project=False)
- ThreadLocalORMSession.flush_all()
- assert not M.Project.query.get(shortname='u/foobar123')
- assert u.private_project()
- assert M.Project.query.get(shortname='u/foobar123')
-
-
-@with_setup(setup_method)
-def test_user_project_already_deleted_creates_on_demand():
- u = M.User.register(dict(username='foobar123'), make_project=True)
- p = M.Project.query.get(shortname='u/foobar123')
- p.deleted = True
- ThreadLocalORMSession.flush_all()
- assert not M.Project.query.get(shortname='u/foobar123', deleted=False)
- assert u.private_project()
- ThreadLocalORMSession.flush_all()
- assert M.Project.query.get(shortname='u/foobar123', deleted=False)
-
-
-@with_setup(setup_method)
-def test_user_project_does_not_create_on_demand_for_disabled_user():
- u = M.User.register(
- dict(username='foobar123', disabled=True), make_project=False)
- ThreadLocalORMSession.flush_all()
- assert not u.private_project()
- assert not M.Project.query.get(shortname='u/foobar123')
-
-
-@with_setup(setup_method)
-def test_user_project_does_not_create_on_demand_for_anonymous_user():
- u = M.User.anonymous()
- ThreadLocalORMSession.flush_all()
- assert not u.private_project()
- assert not M.Project.query.get(shortname='u/anonymous')
- assert not M.Project.query.get(shortname='u/*anonymous')
-
-
-@with_setup(setup_method)
-@patch('allura.model.auth.log')
-def test_user_by_email_address(log):
- u1 = M.User.register(dict(username='abc1'), make_project=False)
- u2 = M.User.register(dict(username='abc2'), make_project=False)
- addr1 = M.EmailAddress(email='abc123@abc.me', confirmed=True,
- claimed_by_user_id=u1._id)
- addr2 = M.EmailAddress(email='abc123@abc.me', confirmed=True,
- claimed_by_user_id=u2._id)
- # both users are disabled
- u1.disabled, u2.disabled = True, True
- ThreadLocalORMSession.flush_all()
- assert M.User.by_email_address('abc123@abc.me') == None
- assert log.warn.call_count == 0
-
- # only u2 is active
- u1.disabled, u2.disabled = True, False
- ThreadLocalORMSession.flush_all()
- assert M.User.by_email_address('abc123@abc.me') == u2
- assert log.warn.call_count == 0
-
- # both are active
- u1.disabled, u2.disabled = False, False
- ThreadLocalORMSession.flush_all()
- assert M.User.by_email_address('abc123@abc.me') in [u1, u2]
- assert log.warn.call_count == 1
-
- # invalid email returns None, but not user which claimed
- # nobody@example.com as before
- nobody = M.EmailAddress(email='nobody@example.com', confirmed=True,
- claimed_by_user_id=u1._id)
- ThreadLocalORMSession.flush_all()
- assert M.User.by_email_address('nobody@example.com') == u1
- assert M.User.by_email_address('invalid') == None
-
-
-def test_user_equality():
- assert M.User.by_username('test-user') == M.User.by_username('test-user')
- assert M.User.anonymous() == M.User.anonymous()
- assert M.User.by_username('*anonymous') == M.User.anonymous()
-
- assert M.User.by_username('test-user') != M.User.by_username('test-admin')
- assert M.User.by_username('test-user') != M.User.anonymous()
- assert M.User.anonymous() != None
- assert M.User.anonymous() != 12345
- assert M.User.anonymous() != M.User()
-
-
-def test_user_hash():
- assert M.User.by_username('test-user') in {M.User.by_username('test-user')}
- assert M.User.anonymous() in {M.User.anonymous()}
- assert M.User.by_username('*anonymous') in {M.User.anonymous()}
-
- assert M.User.by_username('test-user') not in {M.User.by_username('test-admin')}
- assert M.User.anonymous() not in {M.User.by_username('test-admin')}
- assert M.User.anonymous() not in {0, None}
-
-
-@with_setup(setup_method)
-def test_project_role():
- role = M.ProjectRole(project_id=c.project._id, name='test_role')
- M.ProjectRole.by_user(c.user, upsert=True).roles.append(role._id)
- ThreadLocalORMSession.flush_all()
- roles = g.credentials.user_roles(
- c.user._id, project_id=c.project.root_project._id)
- roles_ids = [r['_id'] for r in roles]
- roles = M.ProjectRole.query.find({'_id': {'$in': roles_ids}})
- for pr in roles:
- assert pr.display()
- pr.special
- assert pr.user in (c.user, None, M.User.anonymous())
-
-
-@with_setup(setup_method)
-def test_default_project_roles():
- roles = {
- pr.name: pr
- for pr in M.ProjectRole.query.find(dict(
- project_id=c.project._id)).all()
- if pr.name}
- assert 'Admin' in list(roles.keys()), list(roles.keys())
- assert 'Developer' in list(roles.keys()), list(roles.keys())
- assert 'Member' in list(roles.keys()), list(roles.keys())
- assert roles['Developer']._id in roles['Admin'].roles
- assert roles['Member']._id in roles['Developer'].roles
-
- # There're 1 user assigned to project, represented by
- # relational (vs named) ProjectRole's
- assert len(roles) == M.ProjectRole.query.find(dict(
- project_id=c.project._id)).count() - 1
-
-
-@with_setup(setup_method)
-def test_email_address_claimed_by_user():
- addr = M.EmailAddress(email='test_admin@domain.net',
- claimed_by_user_id=c.user._id)
- c.user.disabled = True
- ThreadLocalORMSession.flush_all()
- assert addr.claimed_by_user() is None
-
-
-@with_setup(setup_method)
-@td.with_user_project('test-admin')
-def test_user_projects_by_role():
- assert ({p.shortname for p in c.user.my_projects()} ==
- {'test', 'test2', 'u/test-admin', 'adobe-1', '--init--'})
- assert ({p.shortname for p in c.user.my_projects_by_role_name('Admin')} ==
- {'test', 'test2', 'u/test-admin', 'adobe-1', '--init--'})
- # Remove admin access from c.user to test2 project
- project = M.Project.query.get(shortname='test2')
- admin_role = M.ProjectRole.by_name('Admin', project)
- developer_role = M.ProjectRole.by_name('Developer', project)
- user_role = M.ProjectRole.by_user(c.user, project=project, upsert=True)
- user_role.roles.remove(admin_role._id)
- user_role.roles.append(developer_role._id)
- ThreadLocalORMSession.flush_all()
- g.credentials.clear()
- assert ({p.shortname for p in c.user.my_projects()} ==
- {'test', 'test2', 'u/test-admin', 'adobe-1', '--init--'})
- assert ({p.shortname for p in c.user.my_projects_by_role_name('Admin')} ==
- {'test', 'u/test-admin', 'adobe-1', '--init--'})
-
-
-@td.with_user_project('test-admin')
-@with_setup(setup_method)
-def test_user_projects_unnamed():
- """
- Confirm that spurious ProjectRoles associating a user with
- a project to which they do not belong to any named group
- don't cause the user to count as a member of the project.
- """
- sub1 = M.Project.query.get(shortname='test/sub1')
- M.ProjectRole(
- user_id=c.user._id,
- project_id=sub1._id)
- ThreadLocalORMSession.flush_all()
- project_names = [p.shortname for p in c.user.my_projects()]
- assert 'test/sub1' not in project_names
- assert 'test' in project_names
-
-
-@patch.object(g, 'user_message_max_messages', 3)
-def test_check_sent_user_message_times():
- user1 = M.User.register(dict(username='test-user'), make_project=False)
- time1 = datetime.utcnow() - timedelta(minutes=30)
- time2 = datetime.utcnow() - timedelta(minutes=45)
- time3 = datetime.utcnow() - timedelta(minutes=70)
- user1.sent_user_message_times = [time1, time2, time3]
- assert user1.can_send_user_message()
- assert len(user1.sent_user_message_times) == 2
- user1.sent_user_message_times.append(
- datetime.utcnow() - timedelta(minutes=15))
- assert not user1.can_send_user_message()
-
-
-@with_setup(setup_method)
-@td.with_user_project('test-admin')
-def test_user_track_active():
- # without this session flushing inside track_active raises Exception
- setup_functional_test()
- c.user = M.User.by_username('test-admin')
-
- assert c.user.last_access['session_date'] == None
- assert c.user.last_access['session_ip'] == None
- assert c.user.last_access['session_ua'] == None
-
- req = Mock(headers={'User-Agent': 'browser'}, remote_addr='addr')
- c.user.track_active(req)
- c.user = M.User.by_username(c.user.username)
- assert c.user.last_access['session_date'] != None
- assert c.user.last_access['session_ip'] == 'addr'
- assert c.user.last_access['session_ua'] == 'browser'
-
- # ensure that session activity tracked with a whole-day granularity
- prev_date = c.user.last_access['session_date']
- c.user.track_active(req)
- c.user = M.User.by_username(c.user.username)
- assert c.user.last_access['session_date'] == prev_date
- yesterday = datetime.utcnow() - timedelta(1)
- c.user.last_access['session_date'] = yesterday
- session(c.user).flush(c.user)
- c.user.track_active(req)
- c.user = M.User.by_username(c.user.username)
- assert c.user.last_access['session_date'] > yesterday
-
- # ...or if IP or User Agent has changed
- req.remote_addr = 'new addr'
- c.user.track_active(req)
- c.user = M.User.by_username(c.user.username)
- assert c.user.last_access['session_ip'] == 'new addr'
- assert c.user.last_access['session_ua'] == 'browser'
- req.headers['User-Agent'] = 'new browser'
- c.user.track_active(req)
- c.user = M.User.by_username(c.user.username)
- assert c.user.last_access['session_ip'] == 'new addr'
- assert c.user.last_access['session_ua'] == 'new browser'
-
-
-@with_setup(setup_method)
-def test_user_index():
- c.user.email_addresses = ['email1', 'email2']
- c.user.set_pref('email_address', 'email2')
- idx = c.user.index()
- assert idx['id'] == c.user.index_id()
- assert idx['title'] == 'User test-admin'
- assert idx['type_s'] == 'User'
- assert idx['username_s'] == 'test-admin'
- assert idx['email_addresses_t'] == 'email1 email2'
- assert idx['email_address_s'] == 'email2'
- assert 'last_password_updated_dt' in idx
- assert idx['disabled_b'] == False
- assert 'results_per_page_i' in idx
- assert 'email_format_s' in idx
- assert 'disable_user_messages_b' in idx
- assert idx['display_name_t'] == 'Test Admin'
- assert idx['sex_s'] == 'Unknown'
- assert 'birthdate_dt' in idx
- assert 'localization_s' in idx
- assert 'timezone_s' in idx
- assert 'socialnetworks_t' in idx
- assert 'telnumbers_t' in idx
- assert 'skypeaccount_s' in idx
- assert 'webpages_t' in idx
- assert 'skills_t' in idx
- assert 'last_access_login_date_dt' in idx
- assert 'last_access_login_ip_s' in idx
- assert 'last_access_login_ua_t' in idx
- assert 'last_access_session_date_dt' in idx
- assert 'last_access_session_ip_s' in idx
- assert 'last_access_session_ua_t' in idx
- # provided bby auth provider
- assert 'user_registration_date_dt' in idx
-
-
-@with_setup(setup_method)
-def test_user_index_none_values():
- c.user.email_addresses = [None]
- c.user.set_pref('telnumbers', [None])
- c.user.set_pref('webpages', [None])
- idx = c.user.index()
- assert idx['email_addresses_t'] == ''
- assert idx['telnumbers_t'] == ''
- assert idx['webpages_t'] == ''
-
-
-@with_setup(setup_method)
-def test_user_backfill_login_details():
- with h.push_config(request, user_agent='TestBrowser/55'):
- # these shouldn't match
- h.auditlog_user('something happened')
- h.auditlog_user('blah blah Password changed')
- with h.push_config(request, user_agent='TestBrowser/56'):
- # these should all match, but only one entry created for this ip/ua
- h.auditlog_user('Account activated')
- h.auditlog_user('Successful login')
- h.auditlog_user('Password changed')
- with h.push_config(request, user_agent='TestBrowser/57'):
- # this should match too
- h.auditlog_user('Set up multifactor TOTP')
- ThreadLocalORMSession.flush_all()
-
- auth_provider = plugin.AuthenticationProvider.get(None)
- c.user.backfill_login_details(auth_provider)
-
- details = M.UserLoginDetails.query.find({'user_id': c.user._id}).sort('ua').all()
- assert len(details) == 2, details
- assert details[0].ip == '127.0.0.1'
- assert details[0].ua == 'TestBrowser/56'
- assert details[1].ip == '127.0.0.1'
- assert details[1].ua == 'TestBrowser/57'
+class TestAuth:
+
+ def setup_method(self):
+ setup_basic_test()
+ setup_global_objects()
+
+ def test_email_address(self):
+ addr = M.EmailAddress(email='test_admin@domain.net',
+ claimed_by_user_id=c.user._id)
+ ThreadLocalORMSession.flush_all()
+ assert addr.claimed_by_user() == c.user
+ addr2 = M.EmailAddress.create('test@domain.net')
+ addr3 = M.EmailAddress.create('test_admin@domain.net')
+ ThreadLocalORMSession.flush_all()
+
+ # Duplicate emails are allowed, until the email is confirmed
+ assert addr3 is not addr
+
+ assert addr2 is not addr
+ assert addr2
+ addr4 = M.EmailAddress.create('test@DOMAIN.NET')
+ assert addr4 is not addr2
+
+ assert addr is c.user.address_object('test_admin@domain.net')
+ c.user.claim_address('test@DOMAIN.NET')
+ assert 'test@domain.net' in c.user.email_addresses
+
+ def selftest_email_address_lookup_helpers():
+ addr = M.EmailAddress.create('TEST@DOMAIN.NET')
+ nobody = M.EmailAddress.create('nobody@example.com')
+ ThreadLocalORMSession.flush_all()
+ assert addr.email == 'TEST@domain.net'
+
+ assert M.EmailAddress.get(email='TEST@DOMAIN.NET') == addr
+ assert M.EmailAddress.get(email='TEST@domain.net') == addr
+ assert M.EmailAddress.get(email='test@domain.net') is None
+ assert M.EmailAddress.get(email=None) is None
+ assert M.EmailAddress.get(email='nobody@example.com') == nobody
+ # invalid email returns None, but not nobody@example.com as before
+ assert M.EmailAddress.get(email='invalid') is None
+
+ assert M.EmailAddress.find(dict(email='TEST@DOMAIN.NET')).all() == [addr]
+ assert M.EmailAddress.find(dict(email='TEST@domain.net')).all() == [addr]
+ assert M.EmailAddress.find(dict(email='test@domain.net')).all() == []
+ assert M.EmailAddress.find(dict(email=None)).all() == []
+ assert M.EmailAddress.find(dict(email='nobody@example.com')).all() == [nobody]
+ # invalid email returns empty query, but not nobody@example.com as before
+ assert M.EmailAddress.find(dict(email='invalid')).all() == []
+
+ def test_email_address_canonical(self):
+ assert M.EmailAddress.canonical('nobody@EXAMPLE.COM') == \
+ 'nobody@example.com'
+ assert M.EmailAddress.canonical('nobody@example.com') == \
+ 'nobody@example.com'
+ assert M.EmailAddress.canonical('I Am Nobody <no...@example.com>') == \
+ 'nobody@example.com'
+ assert M.EmailAddress.canonical(' nobody@example.com\t') == \
+ 'nobody@example.com'
+ assert M.EmailAddress.canonical('I Am@Nobody <no...@example.com> ') == \
+ 'nobody@example.com'
+ assert M.EmailAddress.canonical(' No@body <no...@example.com> ') == \
+ 'no@body@example.com'
+ assert M.EmailAddress.canonical('no@body@example.com') == \
+ 'no@body@example.com'
+ assert M.EmailAddress.canonical('invalid') is None
+
+ def test_email_address_send_verification_link(self):
+ addr = M.EmailAddress(email='test_admin@domain.net',
+ claimed_by_user_id=c.user._id)
+
+ addr.send_verification_link()
+
+ with patch('allura.tasks.mail_tasks.smtp_client._client') as _client:
+ M.MonQTask.run_ready()
+ return_path, rcpts, body = _client.sendmail.call_args[0]
+ assert rcpts == ['test_admin@domain.net']
+
+ @td.with_user_project('test-admin')
+ def test_user(self):
+ assert c.user.url() .endswith('/u/test-admin/')
+ assert c.user.script_name .endswith('/u/test-admin/')
+ assert ({p.shortname for p in c.user.my_projects()} ==
+ {'test', 'test2', 'u/test-admin', 'adobe-1', '--init--'})
+ # delete one of the projects and make sure it won't appear in my_projects()
+ p = M.Project.query.get(shortname='test2')
+ p.deleted = True
+ ThreadLocalORMSession.flush_all()
+ assert ({p.shortname for p in c.user.my_projects()} ==
+ {'test', 'u/test-admin', 'adobe-1', '--init--'})
+ u = M.User.register(dict(
+ username='nosetest-user'))
+ ThreadLocalORMSession.flush_all()
+ assert u.reg_date
+ assert u.private_project().shortname == 'u/nosetest-user'
+ roles = g.credentials.user_roles(
+ u._id, project_id=u.private_project().root_project._id)
+ assert len(roles) == 3, roles
+ u.set_password('foo')
+ provider = plugin.LocalAuthenticationProvider(Request.blank('/'))
+ assert provider._validate_password(u, 'foo')
+ assert not provider._validate_password(u, 'foobar')
+ u.set_password('foobar')
+ assert provider._validate_password(u, 'foobar')
+ assert not provider._validate_password(u, 'foo')
+
+ def test_user_project_creates_on_demand(self):
+ u = M.User.register(dict(username='foobar123'), make_project=False)
+ ThreadLocalORMSession.flush_all()
+ assert not M.Project.query.get(shortname='u/foobar123')
+ assert u.private_project()
+ assert M.Project.query.get(shortname='u/foobar123')
+
+ def test_user_project_already_deleted_creates_on_demand(self):
+ u = M.User.register(dict(username='foobar123'), make_project=True)
+ p = M.Project.query.get(shortname='u/foobar123')
+ p.deleted = True
+ ThreadLocalORMSession.flush_all()
+ assert not M.Project.query.get(shortname='u/foobar123', deleted=False)
+ assert u.private_project()
+ ThreadLocalORMSession.flush_all()
+ assert M.Project.query.get(shortname='u/foobar123', deleted=False)
+
+ def test_user_project_does_not_create_on_demand_for_disabled_user(self):
+ u = M.User.register(
+ dict(username='foobar123', disabled=True), make_project=False)
+ ThreadLocalORMSession.flush_all()
+ assert not u.private_project()
+ assert not M.Project.query.get(shortname='u/foobar123')
+
+ def test_user_project_does_not_create_on_demand_for_anonymous_user(self):
+ u = M.User.anonymous()
+ ThreadLocalORMSession.flush_all()
+ assert not u.private_project()
+ assert not M.Project.query.get(shortname='u/anonymous')
+ assert not M.Project.query.get(shortname='u/*anonymous')
+
+ @patch('allura.model.auth.log')
+ def test_user_by_email_address(self, log):
+ u1 = M.User.register(dict(username='abc1'), make_project=False)
+ u2 = M.User.register(dict(username='abc2'), make_project=False)
+ addr1 = M.EmailAddress(email='abc123@abc.me', confirmed=True,
+ claimed_by_user_id=u1._id)
+ addr2 = M.EmailAddress(email='abc123@abc.me', confirmed=True,
+ claimed_by_user_id=u2._id)
+ # both users are disabled
+ u1.disabled, u2.disabled = True, True
+ ThreadLocalORMSession.flush_all()
+ assert M.User.by_email_address('abc123@abc.me') is None
+ assert log.warn.call_count == 0
+
+ # only u2 is active
+ u1.disabled, u2.disabled = True, False
+ ThreadLocalORMSession.flush_all()
+ assert M.User.by_email_address('abc123@abc.me') == u2
+ assert log.warn.call_count == 0
+
+ # both are active
+ u1.disabled, u2.disabled = False, False
+ ThreadLocalORMSession.flush_all()
+ assert M.User.by_email_address('abc123@abc.me') in [u1, u2]
+ assert log.warn.call_count == 1
+
+ # invalid email returns None, but not user which claimed
+ # nobody@example.com as before
+ nobody = M.EmailAddress(email='nobody@example.com', confirmed=True,
+ claimed_by_user_id=u1._id)
+ ThreadLocalORMSession.flush_all()
+ assert M.User.by_email_address('nobody@example.com') == u1
+ assert M.User.by_email_address('invalid') is None
+
+ def test_user_equality(self):
+ assert M.User.by_username('test-user') == M.User.by_username('test-user')
+ assert M.User.anonymous() == M.User.anonymous()
+ assert M.User.by_username('*anonymous') == M.User.anonymous()
+
+ assert M.User.by_username('test-user') != M.User.by_username('test-admin')
+ assert M.User.by_username('test-user') != M.User.anonymous()
+ assert M.User.anonymous() is not None
+ assert M.User.anonymous() != 12345
+ assert M.User.anonymous() != M.User()
+
+ def test_user_hash(self):
+ assert M.User.by_username('test-user') in {M.User.by_username('test-user')}
+ assert M.User.anonymous() in {M.User.anonymous()}
+ assert M.User.by_username('*anonymous') in {M.User.anonymous()}
+
+ assert M.User.by_username('test-user') not in {M.User.by_username('test-admin')}
+ assert M.User.anonymous() not in {M.User.by_username('test-admin')}
+ assert M.User.anonymous() not in {0, None}
+
+ def test_project_role(self):
+ role = M.ProjectRole(project_id=c.project._id, name='test_role')
+ M.ProjectRole.by_user(c.user, upsert=True).roles.append(role._id)
+ ThreadLocalORMSession.flush_all()
+ roles = g.credentials.user_roles(
+ c.user._id, project_id=c.project.root_project._id)
+ roles_ids = [r['_id'] for r in roles]
+ roles = M.ProjectRole.query.find({'_id': {'$in': roles_ids}})
+ for pr in roles:
+ assert pr.display()
+ pr.special
+ assert pr.user in (c.user, None, M.User.anonymous())
+
+ def test_default_project_roles(self):
+ roles = {
+ pr.name: pr
+ for pr in M.ProjectRole.query.find(dict(
+ project_id=c.project._id)).all()
+ if pr.name}
+ assert 'Admin' in list(roles.keys()), list(roles.keys())
+ assert 'Developer' in list(roles.keys()), list(roles.keys())
+ assert 'Member' in list(roles.keys()), list(roles.keys())
+ assert roles['Developer']._id in roles['Admin'].roles
+ assert roles['Member']._id in roles['Developer'].roles
+
+ # There're 1 user assigned to project, represented by
+ # relational (vs named) ProjectRole's
+ assert len(roles) == M.ProjectRole.query.find(dict(
+ project_id=c.project._id)).count() - 1
+
+ def test_email_address_claimed_by_user(self):
+ addr = M.EmailAddress(email='test_admin@domain.net',
+ claimed_by_user_id=c.user._id)
+ c.user.disabled = True
+ ThreadLocalORMSession.flush_all()
+ assert addr.claimed_by_user() is None
+
+ @td.with_user_project('test-admin')
+ def test_user_projects_by_role(self):
+ assert ({p.shortname for p in c.user.my_projects()} ==
+ {'test', 'test2', 'u/test-admin', 'adobe-1', '--init--'})
+ assert ({p.shortname for p in c.user.my_projects_by_role_name('Admin')} ==
+ {'test', 'test2', 'u/test-admin', 'adobe-1', '--init--'})
+ # Remove admin access from c.user to test2 project
+ project = M.Project.query.get(shortname='test2')
+ admin_role = M.ProjectRole.by_name('Admin', project)
+ developer_role = M.ProjectRole.by_name('Developer', project)
+ user_role = M.ProjectRole.by_user(c.user, project=project, upsert=True)
+ user_role.roles.remove(admin_role._id)
+ user_role.roles.append(developer_role._id)
+ ThreadLocalORMSession.flush_all()
+ g.credentials.clear()
+ assert ({p.shortname for p in c.user.my_projects()} ==
+ {'test', 'test2', 'u/test-admin', 'adobe-1', '--init--'})
+ assert ({p.shortname for p in c.user.my_projects_by_role_name('Admin')} ==
+ {'test', 'u/test-admin', 'adobe-1', '--init--'})
+
+ @td.with_user_project('test-admin')
+ def test_user_projects_unnamed(self):
+ """
+ Confirm that spurious ProjectRoles associating a user with
+ a project to which they do not belong to any named group
+ don't cause the user to count as a member of the project.
+ """
+ sub1 = M.Project.query.get(shortname='test/sub1')
+ M.ProjectRole(
+ user_id=c.user._id,
+ project_id=sub1._id)
+ ThreadLocalORMSession.flush_all()
+ project_names = [p.shortname for p in c.user.my_projects()]
+ assert 'test/sub1' not in project_names
+ assert 'test' in project_names
+
+ @patch.object(g, 'user_message_max_messages', 3)
+ def test_check_sent_user_message_times(self):
+ user1 = M.User.register(dict(username='test-user'), make_project=False)
+ time1 = datetime.utcnow() - timedelta(minutes=30)
+ time2 = datetime.utcnow() - timedelta(minutes=45)
+ time3 = datetime.utcnow() - timedelta(minutes=70)
+ user1.sent_user_message_times = [time1, time2, time3]
+ assert user1.can_send_user_message()
+ assert len(user1.sent_user_message_times) == 2
+ user1.sent_user_message_times.append(
+ datetime.utcnow() - timedelta(minutes=15))
+ assert not user1.can_send_user_message()
+
+ @td.with_user_project('test-admin')
+ def test_user_track_active(self):
+ # without this session flushing inside track_active raises Exception
+ setup_functional_test()
+ c.user = M.User.by_username('test-admin')
+
+ assert c.user.last_access['session_date'] is None
+ assert c.user.last_access['session_ip'] is None
+ assert c.user.last_access['session_ua'] is None
+
+ req = Mock(headers={'User-Agent': 'browser'}, remote_addr='addr')
+ c.user.track_active(req)
+ c.user = M.User.by_username(c.user.username)
+ assert c.user.last_access['session_date'] is not None
+ assert c.user.last_access['session_ip'] == 'addr'
+ assert c.user.last_access['session_ua'] == 'browser'
+
+ # ensure that session activity tracked with a whole-day granularity
+ prev_date = c.user.last_access['session_date']
+ c.user.track_active(req)
+ c.user = M.User.by_username(c.user.username)
+ assert c.user.last_access['session_date'] == prev_date
+ yesterday = datetime.utcnow() - timedelta(1)
+ c.user.last_access['session_date'] = yesterday
+ session(c.user).flush(c.user)
+ c.user.track_active(req)
+ c.user = M.User.by_username(c.user.username)
+ assert c.user.last_access['session_date'] > yesterday
+
+ # ...or if IP or User Agent has changed
+ req.remote_addr = 'new addr'
+ c.user.track_active(req)
+ c.user = M.User.by_username(c.user.username)
+ assert c.user.last_access['session_ip'] == 'new addr'
+ assert c.user.last_access['session_ua'] == 'browser'
+ req.headers['User-Agent'] = 'new browser'
+ c.user.track_active(req)
+ c.user = M.User.by_username(c.user.username)
+ assert c.user.last_access['session_ip'] == 'new addr'
+ assert c.user.last_access['session_ua'] == 'new browser'
+
+ def test_user_index(self):
+ c.user.email_addresses = ['email1', 'email2']
+ c.user.set_pref('email_address', 'email2')
+ idx = c.user.index()
+ assert idx['id'] == c.user.index_id()
+ assert idx['title'] == 'User test-admin'
+ assert idx['type_s'] == 'User'
+ assert idx['username_s'] == 'test-admin'
+ assert idx['email_addresses_t'] == 'email1 email2'
+ assert idx['email_address_s'] == 'email2'
+ assert 'last_password_updated_dt' in idx
+ assert idx['disabled_b'] is False
+ assert 'results_per_page_i' in idx
+ assert 'email_format_s' in idx
+ assert 'disable_user_messages_b' in idx
+ assert idx['display_name_t'] == 'Test Admin'
+ assert idx['sex_s'] == 'Unknown'
+ assert 'birthdate_dt' in idx
+ assert 'localization_s' in idx
+ assert 'timezone_s' in idx
+ assert 'socialnetworks_t' in idx
+ assert 'telnumbers_t' in idx
+ assert 'skypeaccount_s' in idx
+ assert 'webpages_t' in idx
+ assert 'skills_t' in idx
+ assert 'last_access_login_date_dt' in idx
+ assert 'last_access_login_ip_s' in idx
+ assert 'last_access_login_ua_t' in idx
+ assert 'last_access_session_date_dt' in idx
+ assert 'last_access_session_ip_s' in idx
+ assert 'last_access_session_ua_t' in idx
+ # provided bby auth provider
+ assert 'user_registration_date_dt' in idx
+
+ def test_user_index_none_values(self):
+ c.user.email_addresses = [None]
+ c.user.set_pref('telnumbers', [None])
+ c.user.set_pref('webpages', [None])
+ idx = c.user.index()
+ assert idx['email_addresses_t'] == ''
+ assert idx['telnumbers_t'] == ''
+ assert idx['webpages_t'] == ''
+
+ def test_user_backfill_login_details(self):
+ with h.push_config(r, user_agent='TestBrowser/55'):
+ # these shouldn't match
+ h.auditlog_user('something happened')
+ h.auditlog_user('blah blah Password changed')
+ with h.push_config(r, user_agent='TestBrowser/56'):
+ # these should all match, but only one entry created for this ip/ua
+ h.auditlog_user('Account activated')
+ h.auditlog_user('Successful login')
+ h.auditlog_user('Password changed')
+ with h.push_config(r, user_agent='TestBrowser/57'):
+ # this should match too
+ h.auditlog_user('Set up multifactor TOTP')
+ ThreadLocalORMSession.flush_all()
+
+ auth_provider = plugin.AuthenticationProvider.get(None)
+ c.user.backfill_login_details(auth_provider)
+
+ details = M.UserLoginDetails.query.find({'user_id': c.user._id}).sort('ua').all()
+ assert len(details) == 2, details
+ assert details[0].ip == '127.0.0.1'
+ assert details[0].ua == 'TestBrowser/56'
+ assert details[1].ip == '127.0.0.1'
+ assert details[1].ua == 'TestBrowser/57'
@with_nose_compatibility
class TestAuditLog:
+ @classmethod
+ def setup_class(cls):
+ setup_basic_test()
+ setup_global_objects()
+
def test_message_html(self):
al = h.auditlog_user('our message <script>alert(1)</script>')
assert al.message == textwrap.dedent('''\
diff --git a/Allura/allura/tests/model/test_discussion.py b/Allura/allura/tests/model/test_discussion.py
index d5abf1a05..3c158cd3f 100644
--- a/Allura/allura/tests/model/test_discussion.py
+++ b/Allura/allura/tests/model/test_discussion.py
@@ -24,10 +24,8 @@ from datetime import datetime, timedelta
from cgi import FieldStorage
from tg import tmpl_context as c
-from alluratest.tools import assert_equals, with_setup
import mock
from mock import patch
-from alluratest.tools import assert_equal, assert_in
from ming.orm import session, ThreadLocalORMSession
from webob import exc
@@ -38,513 +36,469 @@ from allura.tests import TestController
from alluratest.controller import setup_global_objects
-def setup_method():
- controller = TestController()
- controller.setup_method(None)
- controller.app.get('/wiki/Home/')
- setup_global_objects()
- ThreadLocalORMSession.close_all()
- h.set_context('test', 'wiki', neighborhood='Projects')
- ThreadLocalORMSession.flush_all()
- ThreadLocalORMSession.close_all()
-
-
-def teardown_module():
- ThreadLocalORMSession.close_all()
-
-
-@with_setup(setup_method)
-def test_discussion_methods():
- d = M.Discussion(shortname='test', name='test')
- assert d.thread_class() == M.Thread
- assert d.post_class() == M.Post
- assert d.attachment_class() == M.DiscussionAttachment
- ThreadLocalORMSession.flush_all()
- d.update_stats()
- ThreadLocalORMSession.flush_all()
- assert d.last_post is None
- assert d.url().endswith('wiki/_discuss/')
- assert d.index()['name_s'] == 'test'
- assert d.find_posts().count() == 0
- jsn = d.__json__()
- assert jsn['name'] == d.name
- d.delete()
- ThreadLocalORMSession.flush_all()
- ThreadLocalORMSession.close_all()
-
-
-@with_setup(setup_method)
-def test_thread_methods():
- d = M.Discussion(shortname='test', name='test')
- t = M.Thread.new(discussion_id=d._id, subject='Test Thread')
- assert t.discussion_class() == M.Discussion
- assert t.post_class() == M.Post
- assert t.attachment_class() == M.DiscussionAttachment
- p0 = t.post('This is a post')
- p1 = t.post('This is another post')
- time.sleep(0.25)
- t.post('This is a reply', parent_id=p0._id)
- ThreadLocalORMSession.flush_all()
- ThreadLocalORMSession.close_all()
- d = M.Discussion.query.get(shortname='test')
- t = d.threads[0]
- assert d.last_post is not None
- assert t.last_post is not None
- t.create_post_threads(t.posts)
- posts0 = t.find_posts(page=0, limit=10, style='threaded')
- posts1 = t.find_posts(page=0, limit=10, style='timestamp')
- assert posts0 != posts1
- ts = p0.timestamp.replace(
- microsecond=int(p0.timestamp.microsecond // 1000) * 1000)
- posts2 = t.find_posts(page=0, limit=10, style='threaded', timestamp=ts)
- assert len(posts2) > 0
-
- assert 'wiki/_discuss/' in t.url()
- assert t.index()['views_i'] == 0
- assert t.post_count == 3
- jsn = t.__json__()
- assert '_id' in jsn
- assert len(jsn['posts']) == 3
- (p.approve() for p in (p0, p1))
- ThreadLocalORMSession.flush_all()
- assert t.num_replies == 3
- t.spam()
- assert t.num_replies == 0
- ThreadLocalORMSession.flush_all()
- assert len(t.find_posts()) == 0
- t.delete()
-
-
-@with_setup(setup_method)
-def test_thread_new():
- with mock.patch('allura.model.discuss.h.nonce') as nonce:
- nonce.side_effect = ['deadbeef', 'deadbeef', 'beefdead']
+class TestDiscussion:
+
+ def setup_method(self):
+ controller = TestController()
+ controller.setup_method(None)
+ controller.app.get('/wiki/Home/')
+ setup_global_objects()
+ ThreadLocalORMSession.close_all()
+ h.set_context('test', 'wiki', neighborhood='Projects')
+ ThreadLocalORMSession.flush_all()
+ ThreadLocalORMSession.close_all()
+
+ @classmethod
+ def teardown_class(cls):
+ ThreadLocalORMSession.close_all()
+
+ def test_discussion_methods(self):
d = M.Discussion(shortname='test', name='test')
- t1 = M.Thread.new(discussion_id=d._id, subject='Test Thread One')
- t2 = M.Thread.new(discussion_id=d._id, subject='Test Thread Two')
+ assert d.thread_class() == M.Thread
+ assert d.post_class() == M.Post
+ assert d.attachment_class() == M.DiscussionAttachment
ThreadLocalORMSession.flush_all()
- session(t1).expunge(t1)
- session(t2).expunge(t2)
- t1_2 = M.Thread.query.get(_id=t1._id)
- t2_2 = M.Thread.query.get(_id=t2._id)
- assert t1._id == 'deadbeef'
- assert t2._id == 'beefdead'
- assert t1_2.subject == 'Test Thread One'
- assert t2_2.subject == 'Test Thread Two'
-
-
-@with_setup(setup_method)
-def test_post_methods():
- d = M.Discussion(shortname='test', name='test')
- t = M.Thread.new(discussion_id=d._id, subject='Test Thread')
- p = t.post('This is a post')
- p2 = t.post('This is another post')
- assert p.discussion_class() == M.Discussion
- assert p.thread_class() == M.Thread
- assert p.attachment_class() == M.DiscussionAttachment
- p.commit()
- assert p.parent is None
- assert p.subject == 'Test Thread'
- assert p.attachments == []
- assert 'wiki/_discuss' in p.url()
- assert p.reply_subject() == 'Re: Test Thread'
- assert p.link_text() == p.subject
-
- ss = p.history().first()
- assert 'version' in h.get_first(ss.index(), 'title')
- assert '#' in ss.shorthand_id()
-
- jsn = p.__json__()
- assert jsn["thread_id"] == t._id
-
- (p.approve() for p in (p, p2))
- ThreadLocalORMSession.flush_all()
- assert t.num_replies == 2
- p.spam()
- assert t.num_replies == 1
- p.undo('ok')
- assert t.num_replies == 2
- p.delete()
- assert t.num_replies == 1
-
-
-@with_setup(setup_method)
-def test_attachment_methods():
- d = M.Discussion(shortname='test', name='test')
- t = M.Thread.new(discussion_id=d._id, subject='Test Thread')
- p = t.post('This is a post')
- p_att = p.attach('foo.text', BytesIO(b'Hello, world!'),
- discussion_id=d._id,
- thread_id=t._id,
- post_id=p._id)
- t_att = p.attach('foo2.text', BytesIO(b'Hello, thread!'),
- discussion_id=d._id,
- thread_id=t._id)
- d_att = p.attach('foo3.text', BytesIO(b'Hello, discussion!'),
- discussion_id=d._id)
-
- ThreadLocalORMSession.flush_all()
- assert p_att.post == p
- assert p_att.thread == t
- assert p_att.discussion == d
- for att in (p_att, t_att, d_att):
- assert 'wiki/_discuss' in att.url()
- assert 'attachment/' in att.url()
-
- # Test notification in mail
- t = M.Thread.new(discussion_id=d._id, subject='Test comment notification')
- fs = FieldStorage()
- fs.name = 'file_info'
- fs.filename = 'fake.txt'
- fs.type = 'text/plain'
- fs.file = BytesIO(b'this is the content of the fake file\n')
- p = t.post(text='test message', forum=None, subject='', file_info=fs)
- ThreadLocalORMSession.flush_all()
- n = M.Notification.query.get(
- subject='[test:wiki] Test comment notification')
- url = h.absurl(f'{p.url()}attachment/{fs.filename}')
- assert (
- '\nAttachments:\n\n'
- '- [fake.txt]({}) (37 Bytes; text/plain)'.format(url) in
- n.text)
-
-
-@with_setup(setup_method)
-def test_multiple_attachments():
- test_file1 = FieldStorage()
- test_file1.name = 'file_info'
- test_file1.filename = 'test1.txt'
- test_file1.type = 'text/plain'
- test_file1.file = BytesIO(b'test file1\n')
- test_file2 = FieldStorage()
- test_file2.name = 'file_info'
- test_file2.filename = 'test2.txt'
- test_file2.type = 'text/plain'
- test_file2.file = BytesIO(b'test file2\n')
- d = M.Discussion(shortname='test', name='test')
- t = M.Thread.new(discussion_id=d._id, subject='Test Thread')
- test_post = t.post('test post')
- test_post.add_multiple_attachments([test_file1, test_file2])
- ThreadLocalORMSession.flush_all()
- assert len(test_post.attachments) == 2
- attaches = test_post.attachments
- assert 'test1.txt' in [attaches[0].filename, attaches[1].filename]
- assert 'test2.txt' in [attaches[0].filename, attaches[1].filename]
-
-
-@with_setup(setup_method)
-def test_add_attachment():
- test_file = FieldStorage()
- test_file.name = 'file_info'
- test_file.filename = 'test.txt'
- test_file.type = 'text/plain'
- test_file.file = BytesIO(b'test file\n')
- d = M.Discussion(shortname='test', name='test')
- t = M.Thread.new(discussion_id=d._id, subject='Test Thread')
- test_post = t.post('test post')
- test_post.add_attachment(test_file)
- ThreadLocalORMSession.flush_all()
- assert len(test_post.attachments) == 1
- attach = test_post.attachments[0]
- assert attach.filename == 'test.txt', attach.filename
- assert attach.content_type == 'text/plain', attach.content_type
-
-
-def test_notification_two_attaches():
- d = M.Discussion(shortname='test', name='test')
- t = M.Thread.new(discussion_id=d._id, subject='Test comment notification')
- fs1 = FieldStorage()
- fs1.name = 'file_info'
- fs1.filename = 'fake.txt'
- fs1.type = 'text/plain'
- fs1.file = BytesIO(b'this is the content of the fake file\n')
- fs2 = FieldStorage()
- fs2.name = 'file_info'
- fs2.filename = 'fake2.txt'
- fs2.type = 'text/plain'
- fs2.file = BytesIO(b'this is the content of the fake file\n')
- p = t.post(text='test message', forum=None, subject='', file_info=[fs1, fs2])
- ThreadLocalORMSession.flush_all()
- n = M.Notification.query.get(
- subject='[test:wiki] Test comment notification')
- base_url = h.absurl(f'{p.url()}attachment/')
- assert (
- '\nAttachments:\n\n'
- '- [fake.txt]({0}fake.txt) (37 Bytes; text/plain)\n'
- '- [fake2.txt]({0}fake2.txt) (37 Bytes; text/plain)'.format(base_url) in
- n.text)
-
-
-@with_setup(setup_method)
-def test_discussion_delete():
- d = M.Discussion(shortname='test', name='test')
- t = M.Thread.new(discussion_id=d._id, subject='Test Thread')
- p = t.post('This is a post')
- p.attach('foo.text', BytesIO(b''),
- discussion_id=d._id,
- thread_id=t._id,
- post_id=p._id)
- M.ArtifactReference.from_artifact(d)
- rid = d.index_id()
- ThreadLocalORMSession.flush_all()
- d.delete()
- ThreadLocalORMSession.flush_all()
- assert M.ArtifactReference.query.find(dict(_id=rid)).count() == 0
-
-
-@with_setup(setup_method)
-def test_thread_delete():
- d = M.Discussion(shortname='test', name='test')
- t = M.Thread.new(discussion_id=d._id, subject='Test Thread')
- p = t.post('This is a post')
- p.attach('foo.text', BytesIO(b''),
- discussion_id=d._id,
- thread_id=t._id,
- post_id=p._id)
- ThreadLocalORMSession.flush_all()
- t.delete()
-
-
-@with_setup(setup_method)
-def test_post_delete():
- d = M.Discussion(shortname='test', name='test')
- t = M.Thread.new(discussion_id=d._id, subject='Test Thread')
- p = t.post('This is a post')
- p.attach('foo.text', BytesIO(b''),
- discussion_id=d._id,
- thread_id=t._id,
- post_id=p._id)
- ThreadLocalORMSession.flush_all()
- p.delete()
-
-
-@with_setup(setup_method)
-def test_post_undo():
- d = M.Discussion(shortname='test', name='test')
- t = M.Thread.new(discussion_id=d._id, subject='Test Thread')
- p = t.post('This is a post')
- t.post('This is a post2')
- t.post('This is a post3')
- ThreadLocalORMSession.flush_all()
- assert t.num_replies == 3
- p.spam()
- assert t.num_replies == 2
- p.undo('ok')
- assert t.num_replies == 3
-
-
-@with_setup(setup_method)
-def test_post_permission_check():
- d = M.Discussion(shortname='test', name='test')
- t = M.Thread.new(discussion_id=d._id, subject='Test Thread')
- c.user = M.User.anonymous()
- try:
- t.post('This post will fail the check.')
- assert False, "Expected an anonymous post to fail."
- except exc.HTTPUnauthorized:
- pass
- t.post('This post will pass the check.', ignore_security=True)
-
-
-@with_setup(setup_method)
-def test_post_url_paginated():
- d = M.Discussion(shortname='test', name='test')
- t = M.Thread(discussion_id=d._id, subject='Test Thread')
- p = [] # posts in display order
- ts = datetime.utcnow() - timedelta(days=1)
- for i in range(5):
- ts += timedelta(minutes=1)
- p.append(t.post('This is a post #%s' % i, timestamp=ts))
-
- ts += timedelta(minutes=1)
- p.insert(1, t.post(
- 'This is reply #0 to post #0', parent_id=p[0]._id, timestamp=ts))
-
- ts += timedelta(minutes=1)
- p.insert(2, t.post(
- 'This is reply #1 to post #0', parent_id=p[0]._id, timestamp=ts))
-
- ts += timedelta(minutes=1)
- p.insert(4, t.post(
- 'This is reply #0 to post #1', parent_id=p[3]._id, timestamp=ts))
-
- ts += timedelta(minutes=1)
- p.insert(6, t.post(
- 'This is reply #0 to post #2', parent_id=p[5]._id, timestamp=ts))
-
- ts += timedelta(minutes=1)
- p.insert(7, t.post(
- 'This is reply #1 to post #2', parent_id=p[5]._id, timestamp=ts))
-
- ts += timedelta(minutes=1)
- p.insert(8, t.post(
- 'This is reply #0 to reply #1 to post #2',
- parent_id=p[7]._id, timestamp=ts))
-
- # with default paging limit
- for _p in p:
- url = t.url() + '?limit=25#' + _p.slug
- assert _p.url_paginated() == url, _p.url_paginated()
-
- # with user paging limit
- limit = 3
- c.user.set_pref('results_per_page', limit)
- for i, _p in enumerate(p):
- page = i // limit
- url = t.url() + '?limit=%s' % limit
- if page > 0:
- url += '&page=%s' % page
- url += '#' + _p.slug
- assert _p.url_paginated() == url
-
-
-@with_setup(setup_method)
-def test_post_url_paginated_with_artifact():
- """Post.url_paginated should return link to attached artifact, if any"""
- from forgewiki.model import Page
- page = Page.upsert(title='Test Page')
- thread = page.discussion_thread
- comment = thread.post('Comment')
- url = page.url() + '?limit=25#' + comment.slug
- assert comment.url_paginated() == url
-
-
-@with_setup(setup_method)
-def test_post_notify():
- d = M.Discussion(shortname='test', name='test')
- d.monitoring_email = 'darthvader@deathstar.org'
- t = M.Thread.new(discussion_id=d._id, subject='Test Thread')
- with patch('allura.model.notification.Notification.send_simple') as send:
- t.post('This is a post')
- send.assert_called_with(d.monitoring_email)
+ d.update_stats()
+ ThreadLocalORMSession.flush_all()
+ assert d.last_post is None
+ assert d.url().endswith('wiki/_discuss/')
+ assert d.index()['name_s'] == 'test'
+ assert d.find_posts().count() == 0
+ jsn = d.__json__()
+ assert jsn['name'] == d.name
+ d.delete()
+ ThreadLocalORMSession.flush_all()
+ ThreadLocalORMSession.close_all()
+
+ def test_thread_methods(self):
+ d = M.Discussion(shortname='test', name='test')
+ t = M.Thread.new(discussion_id=d._id, subject='Test Thread')
+ assert t.discussion_class() == M.Discussion
+ assert t.post_class() == M.Post
+ assert t.attachment_class() == M.DiscussionAttachment
+ p0 = t.post('This is a post')
+ p1 = t.post('This is another post')
+ time.sleep(0.25)
+ t.post('This is a reply', parent_id=p0._id)
+ ThreadLocalORMSession.flush_all()
+ ThreadLocalORMSession.close_all()
+ d = M.Discussion.query.get(shortname='test')
+ t = d.threads[0]
+ assert d.last_post is not None
+ assert t.last_post is not None
+ t.create_post_threads(t.posts)
+ posts0 = t.find_posts(page=0, limit=10, style='threaded')
+ posts1 = t.find_posts(page=0, limit=10, style='timestamp')
+ assert posts0 != posts1
+ ts = p0.timestamp.replace(
+ microsecond=int(p0.timestamp.microsecond // 1000) * 1000)
+ posts2 = t.find_posts(page=0, limit=10, style='threaded', timestamp=ts)
+ assert len(posts2) > 0
+
+ assert 'wiki/_discuss/' in t.url()
+ assert t.index()['views_i'] == 0
+ assert t.post_count == 3
+ jsn = t.__json__()
+ assert '_id' in jsn
+ assert len(jsn['posts']) == 3
+ (p.approve() for p in (p0, p1))
+ ThreadLocalORMSession.flush_all()
+ assert t.num_replies == 3
+ t.spam()
+ assert t.num_replies == 0
+ ThreadLocalORMSession.flush_all()
+ assert len(t.find_posts()) == 0
+ t.delete()
+
+ def test_thread_new(self):
+ with mock.patch('allura.model.discuss.h.nonce') as nonce:
+ nonce.side_effect = ['deadbeef', 'deadbeef', 'beefdead']
+ d = M.Discussion(shortname='test', name='test')
+ t1 = M.Thread.new(discussion_id=d._id, subject='Test Thread One')
+ t2 = M.Thread.new(discussion_id=d._id, subject='Test Thread Two')
+ ThreadLocalORMSession.flush_all()
+ session(t1).expunge(t1)
+ session(t2).expunge(t2)
+ t1_2 = M.Thread.query.get(_id=t1._id)
+ t2_2 = M.Thread.query.get(_id=t2._id)
+ assert t1._id == 'deadbeef'
+ assert t2._id == 'beefdead'
+ assert t1_2.subject == 'Test Thread One'
+ assert t2_2.subject == 'Test Thread Two'
+
+ def test_post_methods(self):
+ d = M.Discussion(shortname='test', name='test')
+ t = M.Thread.new(discussion_id=d._id, subject='Test Thread')
+ p = t.post('This is a post')
+ p2 = t.post('This is another post')
+ assert p.discussion_class() == M.Discussion
+ assert p.thread_class() == M.Thread
+ assert p.attachment_class() == M.DiscussionAttachment
+ p.commit()
+ assert p.parent is None
+ assert p.subject == 'Test Thread'
+ assert p.attachments == []
+ assert 'wiki/_discuss' in p.url()
+ assert p.reply_subject() == 'Re: Test Thread'
+ assert p.link_text() == p.subject
+
+ ss = p.history().first()
+ assert 'version' in h.get_first(ss.index(), 'title')
+ assert '#' in ss.shorthand_id()
+
+ jsn = p.__json__()
+ assert jsn["thread_id"] == t._id
+
+ (p.approve() for p in (p, p2))
+ ThreadLocalORMSession.flush_all()
+ assert t.num_replies == 2
+ p.spam()
+ assert t.num_replies == 1
+ p.undo('ok')
+ assert t.num_replies == 2
+ p.delete()
+ assert t.num_replies == 1
+
+ def test_attachment_methods(self):
+ d = M.Discussion(shortname='test', name='test')
+ t = M.Thread.new(discussion_id=d._id, subject='Test Thread')
+ p = t.post('This is a post')
+ p_att = p.attach('foo.text', BytesIO(b'Hello, world!'),
+ discussion_id=d._id,
+ thread_id=t._id,
+ post_id=p._id)
+ t_att = p.attach('foo2.text', BytesIO(b'Hello, thread!'),
+ discussion_id=d._id,
+ thread_id=t._id)
+ d_att = p.attach('foo3.text', BytesIO(b'Hello, discussion!'),
+ discussion_id=d._id)
+
+ ThreadLocalORMSession.flush_all()
+ assert p_att.post == p
+ assert p_att.thread == t
+ assert p_att.discussion == d
+ for att in (p_att, t_att, d_att):
+ assert 'wiki/_discuss' in att.url()
+ assert 'attachment/' in att.url()
+
+ # Test notification in mail
+ t = M.Thread.new(discussion_id=d._id, subject='Test comment notification')
+ fs = FieldStorage()
+ fs.name = 'file_info'
+ fs.filename = 'fake.txt'
+ fs.type = 'text/plain'
+ fs.file = BytesIO(b'this is the content of the fake file\n')
+ p = t.post(text='test message', forum=None, subject='', file_info=fs)
+ ThreadLocalORMSession.flush_all()
+ n = M.Notification.query.get(
+ subject='[test:wiki] Test comment notification')
+ url = h.absurl(f'{p.url()}attachment/{fs.filename}')
+ assert (
+ '\nAttachments:\n\n'
+ '- [fake.txt]({}) (37 Bytes; text/plain)'.format(url) in
+ n.text)
+
+ def test_multiple_attachments(self):
+ test_file1 = FieldStorage()
+ test_file1.name = 'file_info'
+ test_file1.filename = 'test1.txt'
+ test_file1.type = 'text/plain'
+ test_file1.file = BytesIO(b'test file1\n')
+ test_file2 = FieldStorage()
+ test_file2.name = 'file_info'
+ test_file2.filename = 'test2.txt'
+ test_file2.type = 'text/plain'
+ test_file2.file = BytesIO(b'test file2\n')
+ d = M.Discussion(shortname='test', name='test')
+ t = M.Thread.new(discussion_id=d._id, subject='Test Thread')
+ test_post = t.post('test post')
+ test_post.add_multiple_attachments([test_file1, test_file2])
+ ThreadLocalORMSession.flush_all()
+ assert len(test_post.attachments) == 2
+ attaches = test_post.attachments
+ assert 'test1.txt' in [attaches[0].filename, attaches[1].filename]
+ assert 'test2.txt' in [attaches[0].filename, attaches[1].filename]
+
+ def test_add_attachment(self):
+ test_file = FieldStorage()
+ test_file.name = 'file_info'
+ test_file.filename = 'test.txt'
+ test_file.type = 'text/plain'
+ test_file.file = BytesIO(b'test file\n')
+ d = M.Discussion(shortname='test', name='test')
+ t = M.Thread.new(discussion_id=d._id, subject='Test Thread')
+ test_post = t.post('test post')
+ test_post.add_attachment(test_file)
+ ThreadLocalORMSession.flush_all()
+ assert len(test_post.attachments) == 1
+ attach = test_post.attachments[0]
+ assert attach.filename == 'test.txt', attach.filename
+ assert attach.content_type == 'text/plain', attach.content_type
+
+ def test_notification_two_attaches(self):
+ d = M.Discussion(shortname='test', name='test')
+ t = M.Thread.new(discussion_id=d._id, subject='Test comment notification')
+ fs1 = FieldStorage()
+ fs1.name = 'file_info'
+ fs1.filename = 'fake.txt'
+ fs1.type = 'text/plain'
+ fs1.file = BytesIO(b'this is the content of the fake file\n')
+ fs2 = FieldStorage()
+ fs2.name = 'file_info'
+ fs2.filename = 'fake2.txt'
+ fs2.type = 'text/plain'
+ fs2.file = BytesIO(b'this is the content of the fake file\n')
+ p = t.post(text='test message', forum=None, subject='', file_info=[fs1, fs2])
+ ThreadLocalORMSession.flush_all()
+ n = M.Notification.query.get(
+ subject='[test:wiki] Test comment notification')
+ base_url = h.absurl(f'{p.url()}attachment/')
+ assert (
+ '\nAttachments:\n\n'
+ '- [fake.txt]({0}fake.txt) (37 Bytes; text/plain)\n'
+ '- [fake2.txt]({0}fake2.txt) (37 Bytes; text/plain)'.format(base_url) in
+ n.text)
+
+ def test_discussion_delete(self):
+ d = M.Discussion(shortname='test', name='test')
+ t = M.Thread.new(discussion_id=d._id, subject='Test Thread')
+ p = t.post('This is a post')
+ p.attach('foo.text', BytesIO(b''),
+ discussion_id=d._id,
+ thread_id=t._id,
+ post_id=p._id)
+ M.ArtifactReference.from_artifact(d)
+ rid = d.index_id()
+ ThreadLocalORMSession.flush_all()
+ d.delete()
+ ThreadLocalORMSession.flush_all()
+ assert M.ArtifactReference.query.find(dict(_id=rid)).count() == 0
+
+ def test_thread_delete(self):
+ d = M.Discussion(shortname='test', name='test')
+ t = M.Thread.new(discussion_id=d._id, subject='Test Thread')
+ p = t.post('This is a post')
+ p.attach('foo.text', BytesIO(b''),
+ discussion_id=d._id,
+ thread_id=t._id,
+ post_id=p._id)
+ ThreadLocalORMSession.flush_all()
+ t.delete()
+
+ def test_post_delete(self):
+ d = M.Discussion(shortname='test', name='test')
+ t = M.Thread.new(discussion_id=d._id, subject='Test Thread')
+ p = t.post('This is a post')
+ p.attach('foo.text', BytesIO(b''),
+ discussion_id=d._id,
+ thread_id=t._id,
+ post_id=p._id)
+ ThreadLocalORMSession.flush_all()
+ p.delete()
+
+ def test_post_undo(self):
+ d = M.Discussion(shortname='test', name='test')
+ t = M.Thread.new(discussion_id=d._id, subject='Test Thread')
+ p = t.post('This is a post')
+ t.post('This is a post2')
+ t.post('This is a post3')
+ ThreadLocalORMSession.flush_all()
+ assert t.num_replies == 3
+ p.spam()
+ assert t.num_replies == 2
+ p.undo('ok')
+ assert t.num_replies == 3
- c.app.config.project.notifications_disabled = True
- with patch('allura.model.notification.Notification.send_simple') as send:
- t.post('Another post')
+ def test_post_permission_check(self):
+ d = M.Discussion(shortname='test', name='test')
+ t = M.Thread.new(discussion_id=d._id, subject='Test Thread')
+ c.user = M.User.anonymous()
try:
+ t.post('This post will fail the check.')
+ assert False, "Expected an anonymous post to fail."
+ except exc.HTTPUnauthorized:
+ pass
+ t.post('This post will pass the check.', ignore_security=True)
+
+ def test_post_url_paginated(self):
+ d = M.Discussion(shortname='test', name='test')
+ t = M.Thread(discussion_id=d._id, subject='Test Thread')
+ p = [] # posts in display order
+ ts = datetime.utcnow() - timedelta(days=1)
+ for i in range(5):
+ ts += timedelta(minutes=1)
+ p.append(t.post('This is a post #%s' % i, timestamp=ts))
+
+ ts += timedelta(minutes=1)
+ p.insert(1, t.post(
+ 'This is reply #0 to post #0', parent_id=p[0]._id, timestamp=ts))
+
+ ts += timedelta(minutes=1)
+ p.insert(2, t.post(
+ 'This is reply #1 to post #0', parent_id=p[0]._id, timestamp=ts))
+
+ ts += timedelta(minutes=1)
+ p.insert(4, t.post(
+ 'This is reply #0 to post #1', parent_id=p[3]._id, timestamp=ts))
+
+ ts += timedelta(minutes=1)
+ p.insert(6, t.post(
+ 'This is reply #0 to post #2', parent_id=p[5]._id, timestamp=ts))
+
+ ts += timedelta(minutes=1)
+ p.insert(7, t.post(
+ 'This is reply #1 to post #2', parent_id=p[5]._id, timestamp=ts))
+
+ ts += timedelta(minutes=1)
+ p.insert(8, t.post(
+ 'This is reply #0 to reply #1 to post #2',
+ parent_id=p[7]._id, timestamp=ts))
+
+ # with default paging limit
+ for _p in p:
+ url = t.url() + '?limit=25#' + _p.slug
+ assert _p.url_paginated() == url, _p.url_paginated()
+
+ # with user paging limit
+ limit = 3
+ c.user.set_pref('results_per_page', limit)
+ for i, _p in enumerate(p):
+ page = i // limit
+ url = t.url() + '?limit=%s' % limit
+ if page > 0:
+ url += '&page=%s' % page
+ url += '#' + _p.slug
+ assert _p.url_paginated() == url
+
+ def test_post_url_paginated_with_artifact(self):
+ """Post.url_paginated should return link to attached artifact, if any"""
+ from forgewiki.model import Page
+ page = Page.upsert(title='Test Page')
+ thread = page.discussion_thread
+ comment = thread.post('Comment')
+ url = page.url() + '?limit=25#' + comment.slug
+ assert comment.url_paginated() == url
+
+ def test_post_notify(self):
+ d = M.Discussion(shortname='test', name='test')
+ d.monitoring_email = 'darthvader@deathstar.org'
+ t = M.Thread.new(discussion_id=d._id, subject='Test Thread')
+ with patch('allura.model.notification.Notification.send_simple') as send:
+ t.post('This is a post')
send.assert_called_with(d.monitoring_email)
- except AssertionError:
- pass # method not called as expected
- else:
- assert False, 'send_simple must not be called'
-
-
-@with_setup(setup_method)
-@patch('allura.model.discuss.c.project.users_with_role')
-def test_is_spam_for_admin(users):
- users.return_value = [c.user, ]
- d = M.Discussion(shortname='test', name='test')
- t = M.Thread(discussion_id=d._id, subject='Test Thread')
- t.post('This is a post')
- post = M.Post.query.get(text='This is a post')
- assert not t.is_spam(post), t.is_spam(post)
-
-
-@with_setup(setup_method)
-@patch('allura.model.discuss.c.project.users_with_role')
-def test_is_spam(role):
- d = M.Discussion(shortname='test', name='test')
- t = M.Thread(discussion_id=d._id, subject='Test Thread')
- role.return_value = []
- with mock.patch('allura.controllers.discuss.g.spam_checker') as spam_checker:
+
+ c.app.config.project.notifications_disabled = True
+ with patch('allura.model.notification.Notification.send_simple') as send:
+ t.post('Another post')
+ try:
+ send.assert_called_with(d.monitoring_email)
+ except AssertionError:
+ pass # method not called as expected
+ else:
+ assert False, 'send_simple must not be called'
+
+ @patch('allura.model.discuss.c.project.users_with_role')
+ def test_is_spam_for_admin(self, users):
+ users.return_value = [c.user, ]
+ d = M.Discussion(shortname='test', name='test')
+ t = M.Thread(discussion_id=d._id, subject='Test Thread')
+ t.post('This is a post')
+ post = M.Post.query.get(text='This is a post')
+ assert not t.is_spam(post), t.is_spam(post)
+
+ @patch('allura.model.discuss.c.project.users_with_role')
+ def test_is_spam(self, role):
+ d = M.Discussion(shortname='test', name='test')
+ t = M.Thread(discussion_id=d._id, subject='Test Thread')
+ role.return_value = []
+ with mock.patch('allura.controllers.discuss.g.spam_checker') as spam_checker:
+ spam_checker.check.return_value = True
+ post = mock.Mock()
+ assert t.is_spam(post), t.is_spam(post)
+ assert spam_checker.check.call_count == 1, spam_checker.call_count
+
+ @mock.patch('allura.controllers.discuss.g.spam_checker')
+ def test_not_spam_and_has_unmoderated_post_permission(self, spam_checker):
+ spam_checker.check.return_value = False
+ d = M.Discussion(shortname='test', name='test')
+ t = M.Thread(discussion_id=d._id, subject='Test Thread')
+ role = M.ProjectRole.by_name('*anonymous')._id
+ post_permission = M.ACE.allow(role, 'post')
+ unmoderated_post_permission = M.ACE.allow(role, 'unmoderated_post')
+ t.acl.append(post_permission)
+ t.acl.append(unmoderated_post_permission)
+ with h.push_config(c, user=M.User.anonymous()):
+ post = t.post('Hey')
+ assert post.status == 'ok'
+
+ @mock.patch('allura.controllers.discuss.g.spam_checker')
+ @mock.patch.object(M.Thread, 'notify_moderators')
+ def test_not_spam_but_has_no_unmoderated_post_permission(self, notify_moderators, spam_checker):
+ spam_checker.check.return_value = False
+ d = M.Discussion(shortname='test', name='test')
+ t = M.Thread(discussion_id=d._id, subject='Test Thread')
+ role = M.ProjectRole.by_name('*anonymous')._id
+ post_permission = M.ACE.allow(role, 'post')
+ t.acl.append(post_permission)
+ with h.push_config(c, user=M.User.anonymous()):
+ post = t.post('Hey')
+ assert post.status == 'pending'
+ assert notify_moderators.call_count == 1
+
+ @mock.patch('allura.controllers.discuss.g.spam_checker')
+ @mock.patch.object(M.Thread, 'notify_moderators')
+ def test_spam_and_has_unmoderated_post_permission(self, notify_moderators, spam_checker):
spam_checker.check.return_value = True
- post = mock.Mock()
- assert t.is_spam(post), t.is_spam(post)
- assert spam_checker.check.call_count == 1, spam_checker.call_count
-
-
-@with_setup(setup_method)
-@mock.patch('allura.controllers.discuss.g.spam_checker')
-def test_not_spam_and_has_unmoderated_post_permission(spam_checker):
- spam_checker.check.return_value = False
- d = M.Discussion(shortname='test', name='test')
- t = M.Thread(discussion_id=d._id, subject='Test Thread')
- role = M.ProjectRole.by_name('*anonymous')._id
- post_permission = M.ACE.allow(role, 'post')
- unmoderated_post_permission = M.ACE.allow(role, 'unmoderated_post')
- t.acl.append(post_permission)
- t.acl.append(unmoderated_post_permission)
- with h.push_config(c, user=M.User.anonymous()):
- post = t.post('Hey')
- assert post.status == 'ok'
-
-
-@with_setup(setup_method)
-@mock.patch('allura.controllers.discuss.g.spam_checker')
-@mock.patch.object(M.Thread, 'notify_moderators')
-def test_not_spam_but_has_no_unmoderated_post_permission(notify_moderators, spam_checker):
- spam_checker.check.return_value = False
- d = M.Discussion(shortname='test', name='test')
- t = M.Thread(discussion_id=d._id, subject='Test Thread')
- role = M.ProjectRole.by_name('*anonymous')._id
- post_permission = M.ACE.allow(role, 'post')
- t.acl.append(post_permission)
- with h.push_config(c, user=M.User.anonymous()):
- post = t.post('Hey')
- assert post.status == 'pending'
- assert notify_moderators.call_count == 1
-
-
-@with_setup(setup_method)
-@mock.patch('allura.controllers.discuss.g.spam_checker')
-@mock.patch.object(M.Thread, 'notify_moderators')
-def test_spam_and_has_unmoderated_post_permission(notify_moderators, spam_checker):
- spam_checker.check.return_value = True
- d = M.Discussion(shortname='test', name='test')
- t = M.Thread(discussion_id=d._id, subject='Test Thread')
- role = M.ProjectRole.by_name('*anonymous')._id
- post_permission = M.ACE.allow(role, 'post')
- unmoderated_post_permission = M.ACE.allow(role, 'unmoderated_post')
- t.acl.append(post_permission)
- t.acl.append(unmoderated_post_permission)
- with h.push_config(c, user=M.User.anonymous()):
- post = t.post('Hey')
- assert post.status == 'pending'
- assert notify_moderators.call_count == 1
-
-
-@with_setup(setup_method)
-@mock.patch('allura.controllers.discuss.g.spam_checker')
-def test_thread_subject_not_included_in_text_checked(spam_checker):
- spam_checker.check.return_value = False
- d = M.Discussion(shortname='test', name='test')
- t = M.Thread(discussion_id=d._id, subject='Test Thread')
- t.post('Hello')
- assert spam_checker.check.call_count == 1
- assert spam_checker.check.call_args[0][0] == 'Hello'
-
-
-def test_post_count():
- d = M.Discussion(shortname='test', name='test')
- t = M.Thread(discussion_id=d._id, subject='Test Thread')
- M.Post(discussion_id=d._id, thread_id=t._id, status='spam')
- M.Post(discussion_id=d._id, thread_id=t._id, status='ok')
- M.Post(discussion_id=d._id, thread_id=t._id, status='pending')
- ThreadLocalORMSession.flush_all()
- assert t.post_count == 2
-
-
-@mock.patch('allura.controllers.discuss.g.spam_checker')
-def test_spam_num_replies(spam_checker):
- d = M.Discussion(shortname='test', name='test')
- t = M.Thread(discussion_id=d._id, subject='Test Thread', num_replies=2)
- M.Post(discussion_id=d._id, thread_id=t._id, status='ok')
- ThreadLocalORMSession.flush_all()
- p1 = M.Post(discussion_id=d._id, thread_id=t._id, status='spam')
- p1.spam()
- assert t.num_replies == 1
-
-
-def test_deleted_thread_index():
- d = M.Discussion(shortname='test', name='test')
- t = M.Thread(discussion_id=d._id, subject='Test Thread')
- p = M.Post(discussion_id=d._id, thread_id=t._id, status='ok')
- t.delete()
- ThreadLocalORMSession.flush_all()
-
- # re-query, so relationships get reloaded
- ThreadLocalORMSession.close_all()
- p = M.Post.query.get(_id=p._id)
-
- # just make sure this doesn't fail
- p.index()
\ No newline at end of file
+ d = M.Discussion(shortname='test', name='test')
+ t = M.Thread(discussion_id=d._id, subject='Test Thread')
+ role = M.ProjectRole.by_name('*anonymous')._id
+ post_permission = M.ACE.allow(role, 'post')
+ unmoderated_post_permission = M.ACE.allow(role, 'unmoderated_post')
+ t.acl.append(post_permission)
+ t.acl.append(unmoderated_post_permission)
+ with h.push_config(c, user=M.User.anonymous()):
+ post = t.post('Hey')
+ assert post.status == 'pending'
+ assert notify_moderators.call_count == 1
+
+ @mock.patch('allura.controllers.discuss.g.spam_checker')
+ def test_thread_subject_not_included_in_text_checked(self, spam_checker):
+ spam_checker.check.return_value = False
+ d = M.Discussion(shortname='test', name='test')
+ t = M.Thread(discussion_id=d._id, subject='Test Thread')
+ t.post('Hello')
+ assert spam_checker.check.call_count == 1
+ assert spam_checker.check.call_args[0][0] == 'Hello'
+
+ def test_post_count(self):
+ d = M.Discussion(shortname='test', name='test')
+ t = M.Thread(discussion_id=d._id, subject='Test Thread')
+ M.Post(discussion_id=d._id, thread_id=t._id, status='spam')
+ M.Post(discussion_id=d._id, thread_id=t._id, status='ok')
+ M.Post(discussion_id=d._id, thread_id=t._id, status='pending')
+ ThreadLocalORMSession.flush_all()
+ assert t.post_count == 2
+
+ @mock.patch('allura.controllers.discuss.g.spam_checker')
+ def test_spam_num_replies(self, spam_checker):
+ d = M.Discussion(shortname='test', name='test')
+ t = M.Thread(discussion_id=d._id, subject='Test Thread', num_replies=2)
+ M.Post(discussion_id=d._id, thread_id=t._id, status='ok')
+ ThreadLocalORMSession.flush_all()
+ p1 = M.Post(discussion_id=d._id, thread_id=t._id, status='spam')
+ p1.spam()
+ assert t.num_replies == 1
+
+ def test_deleted_thread_index(self):
+ d = M.Discussion(shortname='test', name='test')
+ t = M.Thread(discussion_id=d._id, subject='Test Thread')
+ p = M.Post(discussion_id=d._id, thread_id=t._id, status='ok')
+ t.delete()
+ ThreadLocalORMSession.flush_all()
+
+ # re-query, so relationships get reloaded
+ ThreadLocalORMSession.close_all()
+ p = M.Post.query.get(_id=p._id)
+
+ # just make sure this doesn't fail
+ p.index()
diff --git a/Allura/allura/tests/model/test_monq.py b/Allura/allura/tests/model/test_monq.py
index 9dc682149..9b2d51754 100644
--- a/Allura/allura/tests/model/test_monq.py
+++ b/Allura/allura/tests/model/test_monq.py
@@ -16,7 +16,6 @@
# under the License.
import pprint
-from alluratest.tools import with_setup
from ming.orm import ThreadLocalORMSession
@@ -24,14 +23,13 @@ from alluratest.controller import setup_basic_test, setup_global_objects
from allura import model as M
-def setup_method():
+def setup_module():
setup_basic_test()
ThreadLocalORMSession.close_all()
setup_global_objects()
M.MonQTask.query.remove({})
-@with_setup(setup_method)
def test_basic_task():
task = M.MonQTask.post(pprint.pformat, ([5, 6],))
ThreadLocalORMSession.flush_all()
diff --git a/Allura/allura/tests/model/test_neighborhood.py b/Allura/allura/tests/model/test_neighborhood.py
index 26a773331..247370888 100644
--- a/Allura/allura/tests/model/test_neighborhood.py
+++ b/Allura/allura/tests/model/test_neighborhood.py
@@ -25,65 +25,64 @@ from allura.tests import decorators as td
from alluratest.controller import setup_basic_test, setup_global_objects
-def setup_method():
- setup_basic_test()
- setup_with_tools()
+class TestNeighboorhoodModel:
+ def setup_method(self):
+ setup_basic_test()
+ self.setup_with_tools()
-@td.with_wiki
-def setup_with_tools():
- setup_global_objects()
+ @td.with_wiki
+ def setup_with_tools(self):
+ setup_global_objects()
+ def test_neighborhood(self):
+ neighborhood = M.Neighborhood.query.get(name='Projects')
+ # Check css output depends of neighborhood level
+ test_css = ".text{color:#000;}"
+ neighborhood.css = test_css
+ neighborhood.features['css'] = 'none'
+ assert neighborhood.get_custom_css() == ""
+ neighborhood.features['css'] = 'picker'
+ assert neighborhood.get_custom_css() == test_css
+ neighborhood.features['css'] = 'custom'
+ assert neighborhood.get_custom_css() == test_css
+ # Check max projects
+ neighborhood.features['max_projects'] = None
+ assert neighborhood.get_max_projects() is None
+ neighborhood.features['max_projects'] = 500
+ assert neighborhood.get_max_projects() == 500
-@with_setup(setup_method)
-def test_neighborhood():
- neighborhood = M.Neighborhood.query.get(name='Projects')
- # Check css output depends of neighborhood level
- test_css = ".text{color:#000;}"
- neighborhood.css = test_css
- neighborhood.features['css'] = 'none'
- assert neighborhood.get_custom_css() == ""
- neighborhood.features['css'] = 'picker'
- assert neighborhood.get_custom_css() == test_css
- neighborhood.features['css'] = 'custom'
- assert neighborhood.get_custom_css() == test_css
- # Check max projects
- neighborhood.features['max_projects'] = None
- assert neighborhood.get_max_projects() is None
- neighborhood.features['max_projects'] = 500
- assert neighborhood.get_max_projects() == 500
+ # Check picker css styles
+ test_css_dict = {'barontop': '#444',
+ 'titlebarbackground': '#555',
+ 'projecttitlefont': 'arial,sans-serif',
+ 'projecttitlecolor': '#333',
+ 'titlebarcolor': '#666'}
+ css_text = neighborhood.compile_css_for_picker(test_css_dict)
+ assert '#333' in css_text
+ assert '#444' in css_text
+ assert '#555' in css_text
+ assert '#666' in css_text
+ assert 'arial,sans-serif' in css_text
+ neighborhood.css = css_text
+ styles_list = neighborhood.get_css_for_picker()
+ for style in styles_list:
+ assert test_css_dict[style['name']] == style['value']
- # Check picker css styles
- test_css_dict = {'barontop': '#444',
- 'titlebarbackground': '#555',
- 'projecttitlefont': 'arial,sans-serif',
- 'projecttitlecolor': '#333',
- 'titlebarcolor': '#666'}
- css_text = neighborhood.compile_css_for_picker(test_css_dict)
- assert '#333' in css_text
- assert '#444' in css_text
- assert '#555' in css_text
- assert '#666' in css_text
- assert 'arial,sans-serif' in css_text
- neighborhood.css = css_text
- styles_list = neighborhood.get_css_for_picker()
- for style in styles_list:
- assert test_css_dict[style['name']] == style['value']
+ # Check neighborhood custom css showing
+ neighborhood.features['css'] = 'none'
+ assert not neighborhood.allow_custom_css
+ neighborhood.features['css'] = 'picker'
+ assert neighborhood.allow_custom_css
+ neighborhood.features['css'] = 'custom'
+ assert neighborhood.allow_custom_css
- # Check neighborhood custom css showing
- neighborhood.features['css'] = 'none'
- assert not neighborhood.allow_custom_css
- neighborhood.features['css'] = 'picker'
- assert neighborhood.allow_custom_css
- neighborhood.features['css'] = 'custom'
- assert neighborhood.allow_custom_css
+ neighborhood.anchored_tools = 'wiki:Wiki, tickets:Tickets'
+ assert neighborhood.get_anchored_tools()['wiki'] == 'Wiki'
+ assert neighborhood.get_anchored_tools()['tickets'] == 'Tickets'
- neighborhood.anchored_tools = 'wiki:Wiki, tickets:Tickets'
- assert neighborhood.get_anchored_tools()['wiki'] == 'Wiki'
- assert neighborhood.get_anchored_tools()['tickets'] == 'Tickets'
+ neighborhood.prohibited_tools = 'wiki, tickets'
+ assert neighborhood.get_prohibited_tools() == ['wiki', 'tickets']
- neighborhood.prohibited_tools = 'wiki, tickets'
- assert neighborhood.get_prohibited_tools() == ['wiki', 'tickets']
-
- # Check properties
- assert neighborhood.shortname == "p"
+ # Check properties
+ assert neighborhood.shortname == "p"
diff --git a/Allura/allura/tests/model/test_oauth.py b/Allura/allura/tests/model/test_oauth.py
index 67d12b094..edb396abc 100644
--- a/Allura/allura/tests/model/test_oauth.py
+++ b/Allura/allura/tests/model/test_oauth.py
@@ -16,28 +16,26 @@
# under the License.
-from alluratest.tools import with_setup, assert_equal, assert_not_equal
-
from ming.odm import ThreadLocalORMSession
from allura import model as M
from alluratest.controller import setup_basic_test, setup_global_objects
-def setup_method():
- setup_basic_test()
- ThreadLocalORMSession.close_all()
- setup_global_objects()
+class TestOAuthModel:
+ def setup_method(self):
+ setup_basic_test()
+ ThreadLocalORMSession.close_all()
+ setup_global_objects()
-@with_setup(setup_method)
-def test_upsert():
- admin = M.User.by_username('test-admin')
- user = M.User.by_username('test-user')
- name = 'test-token'
- token1 = M.OAuthConsumerToken.upsert(name, admin)
- token2 = M.OAuthConsumerToken.upsert(name, admin)
- token3 = M.OAuthConsumerToken.upsert(name, user)
- assert M.OAuthConsumerToken.query.find().count() == 2
- assert token1._id == token2._id
- assert token1._id != token3._id
+ def test_upsert(self):
+ admin = M.User.by_username('test-admin')
+ user = M.User.by_username('test-user')
+ name = 'test-token'
+ token1 = M.OAuthConsumerToken.upsert(name, admin)
+ token2 = M.OAuthConsumerToken.upsert(name, admin)
+ token3 = M.OAuthConsumerToken.upsert(name, user)
+ assert M.OAuthConsumerToken.query.find().count() == 2
+ assert token1._id == token2._id
+ assert token1._id != token3._id
diff --git a/Allura/allura/tests/model/test_project.py b/Allura/allura/tests/model/test_project.py
index 524754bc3..6a1d44a8a 100644
--- a/Allura/allura/tests/model/test_project.py
+++ b/Allura/allura/tests/model/test_project.py
@@ -31,172 +31,161 @@ from allura.lib.exceptions import ToolError, Invalid
from mock import MagicMock, patch
-def setup_method():
- setup_basic_test()
- setup_with_tools()
-
-
-@td.with_wiki
-def setup_with_tools():
- setup_global_objects()
-
-
-@with_setup(setup_method)
-def test_project():
- assert type(c.project.sidebar_menu()) == list
- assert c.project.script_name in c.project.url()
- old_proj = c.project
- h.set_context('test/sub1', neighborhood='Projects')
- assert type(c.project.sidebar_menu()) == list
- assert type(c.project.sitemap()) == list
- assert c.project.sitemap()[1].label == 'Admin'
- assert old_proj in list(c.project.parent_iter())
- h.set_context('test', 'wiki', neighborhood='Projects')
- adobe_nbhd = M.Neighborhood.query.get(name='Adobe')
- p = M.Project.query.get(
- shortname='adobe-1', neighborhood_id=adobe_nbhd._id)
- # assert 'http' in p.url() # We moved adobe into /adobe/, not
- # http://adobe....
- assert p.script_name in p.url()
- assert c.project.shortname == 'test'
- assert '<p>' in c.project.description_html
- c.project.uninstall_app('hello-test-mount-point')
- ThreadLocalORMSession.flush_all()
-
- c.project.install_app('Wiki', 'hello-test-mount-point')
- c.project.support_page = 'hello-test-mount-point'
- assert c.project.app_config('wiki').tool_name == 'wiki'
- ThreadLocalORMSession.flush_all()
- with td.raises(ToolError):
- # already installed
+class TestProjectModel:
+
+ def setup_method(self):
+ setup_basic_test()
+ self.setup_with_tools()
+
+ @td.with_wiki
+ def setup_with_tools(self):
+ setup_global_objects()
+
+ def test_project(self):
+ assert type(c.project.sidebar_menu()) == list
+ assert c.project.script_name in c.project.url()
+ old_proj = c.project
+ h.set_context('test/sub1', neighborhood='Projects')
+ assert type(c.project.sidebar_menu()) == list
+ assert type(c.project.sitemap()) == list
+ assert c.project.sitemap()[1].label == 'Admin'
+ assert old_proj in list(c.project.parent_iter())
+ h.set_context('test', 'wiki', neighborhood='Projects')
+ adobe_nbhd = M.Neighborhood.query.get(name='Adobe')
+ p = M.Project.query.get(
+ shortname='adobe-1', neighborhood_id=adobe_nbhd._id)
+ # assert 'http' in p.url() # We moved adobe into /adobe/, not
+ # http://adobe....
+ assert p.script_name in p.url()
+ assert c.project.shortname == 'test'
+ assert '<p>' in c.project.description_html
+ c.project.uninstall_app('hello-test-mount-point')
+ ThreadLocalORMSession.flush_all()
+
c.project.install_app('Wiki', 'hello-test-mount-point')
- ThreadLocalORMSession.flush_all()
- c.project.uninstall_app('hello-test-mount-point')
- ThreadLocalORMSession.flush_all()
- with td.raises(ToolError):
- # mount point reserved
- c.project.install_app('Wiki', 'feed')
- with td.raises(ToolError):
- # mount point too long
- c.project.install_app('Wiki', 'a' * 64)
- with td.raises(ToolError):
- # mount point must begin with letter
- c.project.install_app('Wiki', '1')
- # single letter mount points are allowed
- c.project.install_app('Wiki', 'a')
- # Make sure the project support page is reset if the tool it was pointing
- # to is uninstalled.
- assert c.project.support_page == ''
- app_config = c.project.app_config('hello')
- app_inst = c.project.app_instance(app_config)
- app_inst = c.project.app_instance('hello')
- app_inst = c.project.app_instance('hello2123')
- c.project.breadcrumbs()
- c.app.config.breadcrumbs()
-
-
-@with_setup(setup_method)
-def test_install_app_validates_options():
- from forgetracker.tracker_main import ForgeTrackerApp
- name = 'TicketMonitoringEmail'
- opt = [o for o in ForgeTrackerApp.config_options if o.name == name][0]
- opt.validator = fev.Email(not_empty=True)
- with patch.object(ForgeTrackerApp, 'config_on_install', new=[opt.name]):
- for v in [None, '', 'bad@email']:
- with td.raises(ToolError):
- c.project.install_app('Tickets', 'test-tickets', **{name: v})
- assert c.project.app_instance('test-tickets') == None
- c.project.install_app('Tickets', 'test-tickets', **{name: 'e@e.com'})
- app = c.project.app_instance('test-tickets')
- assert app.config.options[name] == 'e@e.com'
-
-
-def test_project_index():
- project, idx = c.project, c.project.index()
- assert 'id' in idx
- assert idx['id'] == project.index_id()
- assert 'title' in idx
- assert 'type_s' in idx
- assert 'deleted_b' in idx
- assert 'private_b' in idx
- assert 'neighborhood_id_s' in idx
- assert 'short_description_t' in idx
- assert 'url_s' in idx
-
-
-def test_subproject():
- project = M.Project.query.get(shortname='test')
- with td.raises(ToolError):
- with patch('allura.lib.plugin.ProjectRegistrationProvider') as Provider:
- Provider.get().shortname_validator.to_python.side_effect = Invalid(
- 'name', 'value', {})
- # name doesn't validate
- sp = project.new_subproject('test-proj-nose')
- sp = project.new_subproject('test-proj-nose')
- spp = sp.new_subproject('spp')
- ThreadLocalORMSession.flush_all()
- sp.delete()
- ThreadLocalORMSession.flush_all()
-
-
-@td.with_wiki
-def test_anchored_tools():
- c.project.neighborhood.anchored_tools = 'wiki:Wiki, tickets:Ticket'
- c.project.install_app = MagicMock()
- assert c.project.sitemap()[0].label == 'Wiki'
- assert c.project.install_app.call_args[0][0] == 'tickets'
- assert c.project.ordered_mounts()[0]['ac'].tool_name == 'wiki'
-
-
-def test_set_ordinal_to_admin_tool():
- with h.push_config(c,
- user=M.User.by_username('test-admin'),
- project=M.Project.query.get(shortname='test')):
- sm = c.project.sitemap()
- assert sm[-1].tool_name == 'admin'
-
-
-@with_setup(setup_method)
-def test_users_and_roles():
- p = M.Project.query.get(shortname='test')
- sub = p.direct_subprojects[0]
- u = M.User.by_username('test-admin')
- assert p.users_with_role('Admin') == [u]
- assert p.users_with_role('Admin') == sub.users_with_role('Admin')
- assert p.users_with_role('Admin') == p.admins()
-
- user = p.admins()[0]
- user.disabled = True
- ThreadLocalORMSession.flush_all()
- assert p.users_with_role('Admin') == []
- assert p.users_with_role('Admin') == p.admins()
-
-
-@with_setup(setup_method)
-def test_project_disabled_users():
- p = M.Project.query.get(shortname='test')
- users = p.users()
- assert users[0].username == 'test-admin'
- user = M.User.by_username('test-admin')
- user.disabled = True
- ThreadLocalORMSession.flush_all()
- users = p.users()
- assert users == []
-
-def test_screenshot_unicode_serialization():
- p = M.Project.query.get(shortname='test')
- screenshot_unicode = M.ProjectFile(project_id=p._id, category='screenshot', caption="ConSelección", filename='ConSelección.jpg')
- screenshot_ascii = M.ProjectFile(project_id=p._id, category='screenshot', caption='test-screenshot', filename='test_file.jpg')
- ThreadLocalORMSession.flush_all()
-
- serialized = p.__json__()
- screenshots = sorted(serialized['screenshots'], key=lambda k: k['caption'])
-
- assert len(screenshots) == 2
- assert screenshots[0]['url'] == 'http://localhost/p/test/screenshot/ConSelecci%C3%B3n.jpg'
- assert screenshots[0]['caption'] == "ConSelección"
- assert screenshots[0]['thumbnail_url'] == 'http://localhost/p/test/screenshot/ConSelecci%C3%B3n.jpg/thumb'
-
- assert screenshots[1]['url'] == 'http://localhost/p/test/screenshot/test_file.jpg'
- assert screenshots[1]['caption'] == 'test-screenshot'
- assert screenshots[1]['thumbnail_url'] == 'http://localhost/p/test/screenshot/test_file.jpg/thumb'
+ c.project.support_page = 'hello-test-mount-point'
+ assert c.project.app_config('wiki').tool_name == 'wiki'
+ ThreadLocalORMSession.flush_all()
+ with td.raises(ToolError):
+ # already installed
+ c.project.install_app('Wiki', 'hello-test-mount-point')
+ ThreadLocalORMSession.flush_all()
+ c.project.uninstall_app('hello-test-mount-point')
+ ThreadLocalORMSession.flush_all()
+ with td.raises(ToolError):
+ # mount point reserved
+ c.project.install_app('Wiki', 'feed')
+ with td.raises(ToolError):
+ # mount point too long
+ c.project.install_app('Wiki', 'a' * 64)
+ with td.raises(ToolError):
+ # mount point must begin with letter
+ c.project.install_app('Wiki', '1')
+ # single letter mount points are allowed
+ c.project.install_app('Wiki', 'a')
+ # Make sure the project support page is reset if the tool it was pointing
+ # to is uninstalled.
+ assert c.project.support_page == ''
+ app_config = c.project.app_config('hello')
+ app_inst = c.project.app_instance(app_config)
+ app_inst = c.project.app_instance('hello')
+ app_inst = c.project.app_instance('hello2123')
+ c.project.breadcrumbs()
+ c.app.config.breadcrumbs()
+
+ def test_install_app_validates_options(self):
+ from forgetracker.tracker_main import ForgeTrackerApp
+ name = 'TicketMonitoringEmail'
+ opt = [o for o in ForgeTrackerApp.config_options if o.name == name][0]
+ opt.validator = fev.Email(not_empty=True)
+ with patch.object(ForgeTrackerApp, 'config_on_install', new=[opt.name]):
+ for v in [None, '', 'bad@email']:
+ with td.raises(ToolError):
+ c.project.install_app('Tickets', 'test-tickets', **{name: v})
+ assert c.project.app_instance('test-tickets') == None
+ c.project.install_app('Tickets', 'test-tickets', **{name: 'e@e.com'})
+ app = c.project.app_instance('test-tickets')
+ assert app.config.options[name] == 'e@e.com'
+
+ def test_project_index(self):
+ project, idx = c.project, c.project.index()
+ assert 'id' in idx
+ assert idx['id'] == project.index_id()
+ assert 'title' in idx
+ assert 'type_s' in idx
+ assert 'deleted_b' in idx
+ assert 'private_b' in idx
+ assert 'neighborhood_id_s' in idx
+ assert 'short_description_t' in idx
+ assert 'url_s' in idx
+
+ def test_subproject(self):
+ project = M.Project.query.get(shortname='test')
+ with td.raises(ToolError):
+ with patch('allura.lib.plugin.ProjectRegistrationProvider') as Provider:
+ Provider.get().shortname_validator.to_python.side_effect = Invalid(
+ 'name', 'value', {})
+ # name doesn't validate
+ sp = project.new_subproject('test-proj-nose')
+ sp = project.new_subproject('test-proj-nose')
+ spp = sp.new_subproject('spp')
+ ThreadLocalORMSession.flush_all()
+ sp.delete()
+ ThreadLocalORMSession.flush_all()
+
+ @td.with_wiki
+ def test_anchored_tools(self):
+ c.project.neighborhood.anchored_tools = 'wiki:Wiki, tickets:Ticket'
+ c.project.install_app = MagicMock()
+ assert c.project.sitemap()[0].label == 'Wiki'
+ assert c.project.install_app.call_args[0][0] == 'tickets'
+ assert c.project.ordered_mounts()[0]['ac'].tool_name == 'wiki'
+
+ def test_set_ordinal_to_admin_tool(self):
+ with h.push_config(c,
+ user=M.User.by_username('test-admin'),
+ project=M.Project.query.get(shortname='test')):
+ sm = c.project.sitemap()
+ assert sm[-1].tool_name == 'admin'
+
+ def test_users_and_roles(self):
+ p = M.Project.query.get(shortname='test')
+ sub = p.direct_subprojects[0]
+ u = M.User.by_username('test-admin')
+ assert p.users_with_role('Admin') == [u]
+ assert p.users_with_role('Admin') == sub.users_with_role('Admin')
+ assert p.users_with_role('Admin') == p.admins()
+
+ user = p.admins()[0]
+ user.disabled = True
+ ThreadLocalORMSession.flush_all()
+ assert p.users_with_role('Admin') == []
+ assert p.users_with_role('Admin') == p.admins()
+
+ def test_project_disabled_users(self):
+ p = M.Project.query.get(shortname='test')
+ users = p.users()
+ assert users[0].username == 'test-admin'
+ user = M.User.by_username('test-admin')
+ user.disabled = True
+ ThreadLocalORMSession.flush_all()
+ users = p.users()
+ assert users == []
+
+ def test_screenshot_unicode_serialization(self):
+ p = M.Project.query.get(shortname='test')
+ screenshot_unicode = M.ProjectFile(project_id=p._id, category='screenshot', caption="ConSelección", filename='ConSelección.jpg')
+ screenshot_ascii = M.ProjectFile(project_id=p._id, category='screenshot', caption='test-screenshot', filename='test_file.jpg')
+ ThreadLocalORMSession.flush_all()
+
+ serialized = p.__json__()
+ screenshots = sorted(serialized['screenshots'], key=lambda k: k['caption'])
+
+ assert len(screenshots) == 2
+ assert screenshots[0]['url'] == 'http://localhost/p/test/screenshot/ConSelecci%C3%B3n.jpg'
+ assert screenshots[0]['caption'] == "ConSelección"
+ assert screenshots[0]['thumbnail_url'] == 'http://localhost/p/test/screenshot/ConSelecci%C3%B3n.jpg/thumb'
+
+ assert screenshots[1]['url'] == 'http://localhost/p/test/screenshot/test_file.jpg'
+ assert screenshots[1]['caption'] == 'test-screenshot'
+ assert screenshots[1]['thumbnail_url'] == 'http://localhost/p/test/screenshot/test_file.jpg/thumb'
diff --git a/Allura/allura/tests/test_app.py b/Allura/allura/tests/test_app.py
index 71733a2d5..8a6924ce2 100644
--- a/Allura/allura/tests/test_app.py
+++ b/Allura/allura/tests/test_app.py
@@ -20,161 +20,152 @@ import mock
from ming.base import Object
import pytest
from formencode import validators as fev
+from textwrap import dedent
from alluratest.controller import setup_unit_test
from alluratest.tools import with_setup
from allura import app
from allura.lib.app_globals import Icon
from allura.lib import mail_util
-from alluratest.pytest_helpers import with_nose_compatibility
-
-
-def setup_method():
- setup_unit_test()
- c.user._id = None
- c.project = mock.Mock()
- c.project.name = 'Test Project'
- c.project.shortname = 'tp'
- c.project._id = 'testproject/'
- c.project.url = lambda: '/testproject/'
- app_config = mock.Mock()
- app_config._id = None
- app_config.project_id = 'testproject/'
- app_config.tool_name = 'tool'
- app_config.options = Object(mount_point='foo')
- c.app = mock.Mock()
- c.app.config = app_config
- c.app.config.script_name = lambda: '/testproject/test_application/'
- c.app.config.url = lambda: 'http://testproject/test_application/'
- c.app.url = c.app.config.url()
- c.app.__version__ = '0.0'
-
-def test_config_options():
- options = [
- app.ConfigOption('test1', str, 'MyTestValue'),
- app.ConfigOption('test2', str, lambda:'MyTestValue')]
- assert options[0].default == 'MyTestValue'
- assert options[1].default == 'MyTestValue'
-
-
-def test_config_options_render_attrs():
- opt = app.ConfigOption('test1', str, None, extra_attrs={'type': 'url'})
- assert opt.render_attrs() == 'type="url"'
-
-
-def test_config_option_without_validator():
- opt = app.ConfigOption('test1', str, None)
- assert opt.validate(None) == None
- assert opt.validate('') == ''
- assert opt.validate('val') == 'val'
-
-
-def test_config_option_with_validator():
- v = fev.NotEmpty()
- opt = app.ConfigOption('test1', str, None, validator=v)
- assert opt.validate('val') == 'val'
- pytest.raises(fev.Invalid, opt.validate, None)
- pytest.raises(fev.Invalid, opt.validate, '')
-
-
-@with_setup(setup_method)
-def test_options_on_install_default():
- a = app.Application(c.project, c.app.config)
- assert a.options_on_install() == []
-
-
-@with_setup(setup_method)
-def test_options_on_install():
- opts = [app.ConfigOption('url', str, None),
- app.ConfigOption('private', bool, None)]
- class TestApp(app.Application):
- config_options = app.Application.config_options + opts + [
- app.ConfigOption('not_on_install', str, None),
- ]
- config_on_install = ['url', 'private']
-
- a = TestApp(c.project, c.app.config)
- assert a.options_on_install() == opts
-
-@with_setup(setup_method)
-def test_main_menu():
- class TestApp(app.Application):
- @property
- def sitemap(self):
- children = [app.SitemapEntry('New', 'new', ui_icon=Icon('some-icon')),
- app.SitemapEntry('Recent', 'recent'),
- ]
- return [app.SitemapEntry('My Tool', '.')[children]]
-
- a = TestApp(c.project, c.app.config)
- main_menu = a.main_menu()
- assert len(main_menu) == 1
- assert main_menu[0].children == [] # default main_menu implementation should drop the children from sitemap()
-
-
-@with_setup(setup_method)
-def test_sitemap():
- sm = app.SitemapEntry('test', '')[
- app.SitemapEntry('a', 'a/'),
- app.SitemapEntry('b', 'b/')]
- sm[app.SitemapEntry(lambda app:app.config.script_name(), 'c/')]
- bound_sm = sm.bind_app(c.app)
- assert bound_sm.url == 'http://testproject/test_application/', bound_sm.url
- assert bound_sm.children[
- -1].label == '/testproject/test_application/', bound_sm.children[-1].label
- assert len(sm.children) == 3
- sm.extend([app.SitemapEntry('a', 'a/')[
- app.SitemapEntry('d', 'd/')]])
- assert len(sm.children) == 3
-
-
-@with_setup(setup_method)
-@mock.patch('allura.app.Application.PostClass.query.get')
-def test_handle_artifact_unicode(qg):
- """
- Tests that app.handle_artifact_message can accept utf strings
- """
- ticket = mock.MagicMock()
- ticket.get_discussion_thread.return_value = (mock.MagicMock(), mock.MagicMock())
- post = mock.MagicMock()
- qg.return_value = post
-
- a = app.Application(c.project, c.app.config)
-
- msg = dict(payload='foo ƒ†©¥˙¨ˆ'.encode(), message_id=1, headers={})
- a.handle_artifact_message(ticket, msg)
- assert post.attach.call_args[0][1].getvalue() == 'foo ƒ†©¥˙¨ˆ'.encode()
-
- msg = dict(payload=b'foo', message_id=1, headers={})
- a.handle_artifact_message(ticket, msg)
- assert post.attach.call_args[0][1].getvalue() == b'foo'
-
- msg = dict(payload="\x94my quote\x94".encode(), message_id=1, headers={})
- a.handle_artifact_message(ticket, msg)
- assert post.attach.call_args[0][1].getvalue() == '\x94my quote\x94'.encode()
-
- # assert against prod example
- msg_raw = """Message-Id: <15...@webmail.messagingengine.com>
-From: foo <fo...@bar.com>
-To: "[forge:site-support]" <15...@site-support.forge.p.re.sf.net>
-MIME-Version: 1.0
-Content-Transfer-Encoding: 7bit
-Content-Type: multipart/alternative; boundary="_----------=_150235203132168580"
-Date: Thu, 10 Aug 2017 10:00:31 +0200
-Subject: Re: [forge:site-support] #15391 Unable to join (my own) mailing list
-This is a multi-part message in MIME format.
---_----------=_150235203132168580
-Content-Transfer-Encoding: quoted-printable
-Content-Type: text/plain; charset="utf-8"
-Hi
---_----------=_150235203132168580
-Content-Transfer-Encoding: quoted-printable
-Content-Type: text/html; charset="utf-8"
-<!DOCTYPE html>
-<html><body>Hi</body></html>
---_----------=_150235203132168580--
- """
- msg = mail_util.parse_message(msg_raw)
- for p in [p for p in msg['parts'] if p['payload'] is not None]:
- # filter here mimics logic in `route_email`
- a.handle_artifact_message(ticket, p)
+
+
+class TestApp:
+
+ def setup_method(self):
+ setup_unit_test()
+ c.user._id = None
+ c.project = mock.Mock()
+ c.project.name = 'Test Project'
+ c.project.shortname = 'tp'
+ c.project._id = 'testproject/'
+ c.project.url = lambda: '/testproject/'
+ app_config = mock.Mock()
+ app_config._id = None
+ app_config.project_id = 'testproject/'
+ app_config.tool_name = 'tool'
+ app_config.options = Object(mount_point='foo')
+ c.app = mock.Mock()
+ c.app.config = app_config
+ c.app.config.script_name = lambda: '/testproject/test_application/'
+ c.app.config.url = lambda: 'http://testproject/test_application/'
+ c.app.url = c.app.config.url()
+ c.app.__version__ = '0.0'
+
+ def test_config_options(self):
+ options = [
+ app.ConfigOption('test1', str, 'MyTestValue'),
+ app.ConfigOption('test2', str, lambda:'MyTestValue')]
+ assert options[0].default == 'MyTestValue'
+ assert options[1].default == 'MyTestValue'
+
+ def test_config_options_render_attrs(self):
+ opt = app.ConfigOption('test1', str, None, extra_attrs={'type': 'url'})
+ assert opt.render_attrs() == 'type="url"'
+
+ def test_config_option_without_validator(self):
+ opt = app.ConfigOption('test1', str, None)
+ assert opt.validate(None) == None
+ assert opt.validate('') == ''
+ assert opt.validate('val') == 'val'
+
+ def test_config_option_with_validator(self):
+ v = fev.NotEmpty()
+ opt = app.ConfigOption('test1', str, None, validator=v)
+ assert opt.validate('val') == 'val'
+ pytest.raises(fev.Invalid, opt.validate, None)
+ pytest.raises(fev.Invalid, opt.validate, '')
+
+ def test_options_on_install_default(self):
+ a = app.Application(c.project, c.app.config)
+ assert a.options_on_install() == []
+
+ def test_options_on_install(self):
+ opts = [app.ConfigOption('url', str, None),
+ app.ConfigOption('private', bool, None)]
+ class TestApp(app.Application):
+ config_options = app.Application.config_options + opts + [
+ app.ConfigOption('not_on_install', str, None),
+ ]
+ config_on_install = ['url', 'private']
+
+ a = TestApp(c.project, c.app.config)
+ assert a.options_on_install() == opts
+
+ def test_main_menu(self):
+ class TestApp(app.Application):
+ @property
+ def sitemap(self):
+ children = [app.SitemapEntry('New', 'new', ui_icon=Icon('some-icon')),
+ app.SitemapEntry('Recent', 'recent'),
+ ]
+ return [app.SitemapEntry('My Tool', '.')[children]]
+
+ a = TestApp(c.project, c.app.config)
+ main_menu = a.main_menu()
+ assert len(main_menu) == 1
+ assert main_menu[0].children == [] # default main_menu implementation should drop the children from sitemap()
+
+ def test_sitemap(self):
+ sm = app.SitemapEntry('test', '')[
+ app.SitemapEntry('a', 'a/'),
+ app.SitemapEntry('b', 'b/')]
+ sm[app.SitemapEntry(lambda app:app.config.script_name(), 'c/')]
+ bound_sm = sm.bind_app(c.app)
+ assert bound_sm.url == 'http://testproject/test_application/', bound_sm.url
+ assert bound_sm.children[
+ -1].label == '/testproject/test_application/', bound_sm.children[-1].label
+ assert len(sm.children) == 3
+ sm.extend([app.SitemapEntry('a', 'a/')[
+ app.SitemapEntry('d', 'd/')]])
+ assert len(sm.children) == 3
+
+ @mock.patch('allura.app.Application.PostClass.query.get')
+ def test_handle_artifact_unicode(self, qg):
+ """
+ Tests that app.handle_artifact_message can accept utf strings
+ """
+ ticket = mock.MagicMock()
+ ticket.get_discussion_thread.return_value = (mock.MagicMock(), mock.MagicMock())
+ post = mock.MagicMock()
+ qg.return_value = post
+
+ a = app.Application(c.project, c.app.config)
+
+ msg = dict(payload='foo ƒ†©¥˙¨ˆ'.encode(), message_id=1, headers={})
+ a.handle_artifact_message(ticket, msg)
+ assert post.attach.call_args[0][1].getvalue() == 'foo ƒ†©¥˙¨ˆ'.encode()
+
+ msg = dict(payload=b'foo', message_id=1, headers={})
+ a.handle_artifact_message(ticket, msg)
+ assert post.attach.call_args[0][1].getvalue() == b'foo'
+
+ msg = dict(payload="\x94my quote\x94".encode(), message_id=1, headers={})
+ a.handle_artifact_message(ticket, msg)
+ assert post.attach.call_args[0][1].getvalue() == '\x94my quote\x94'.encode()
+
+ # assert against prod example
+ msg_raw = dedent("""\
+ Message-Id: <15...@webmail.messagingengine.com>
+ From: foo <fo...@bar.com>
+ To: "[forge:site-support]" <15...@site-support.forge.p.re.sf.net>
+ MIME-Version: 1.0
+ Content-Transfer-Encoding: 7bit
+ Content-Type: multipart/alternative; boundary="_----------=_150235203132168580"
+ Date: Thu, 10 Aug 2017 10:00:31 +0200
+ Subject: Re: [forge:site-support] #15391 Unable to join (my own) mailing list
+ This is a multi-part message in MIME format.
+ --_----------=_150235203132168580
+ Content-Transfer-Encoding: quoted-printable
+ Content-Type: text/plain; charset="utf-8"
+ Hi
+ --_----------=_150235203132168580
+ Content-Transfer-Encoding: quoted-printable
+ Content-Type: text/html; charset="utf-8"
+ <!DOCTYPE html>
+ <html><body>Hi</body></html>
+ --_----------=_150235203132168580--
+ """)
+ msg = mail_util.parse_message(msg_raw)
+ for p in [p for p in msg['parts'] if p['payload'] is not None]:
+ # filter here mimics logic in `route_email`
+ a.handle_artifact_message(ticket, p)