You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@allura.apache.org by he...@apache.org on 2015/05/29 22:40:32 UTC
[10/45] allura git commit: [#7878] Used 2to3 to see what issues would
come up
http://git-wip-us.apache.org/repos/asf/allura/blob/d52f8e2a/tests/model/test_auth.py
----------------------------------------------------------------------
diff --git a/tests/model/test_auth.py b/tests/model/test_auth.py
new file mode 100644
index 0000000..959ef10
--- /dev/null
+++ b/tests/model/test_auth.py
@@ -0,0 +1,420 @@
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Model tests for auth
+"""
+from __future__ import print_function
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import unicode_literals
+
+from nose.tools import (
+ with_setup,
+ assert_equal,
+ assert_not_equal,
+ assert_true,
+ assert_not_in,
+ assert_in,
+)
+from pylons import tmpl_context as c, app_globals as g
+from webob import Request
+from mock import patch, Mock
+from datetime import datetime, timedelta
+
+from ming.orm.ormsession import ThreadLocalORMSession
+from ming.odm import session
+
+from allura import model as M
+from allura.lib import plugin
+from allura.tests import decorators as td
+from alluratest.controller import setup_basic_test, setup_global_objects, setup_functional_test
+
+
+def setUp():
+ setup_basic_test()
+ ThreadLocalORMSession.close_all()
+ setup_global_objects()
+
+
+@with_setup(setUp)
+def test_email_address():
+ addr = M.EmailAddress(email='test_admin@domain.net',
+ claimed_by_user_id=c.user._id)
+ ThreadLocalORMSession.flush_all()
+ assert addr.claimed_by_user() == c.user
+ addr2 = M.EmailAddress.create('test@domain.net')
+ addr3 = M.EmailAddress.create('test_admin@domain.net')
+
+ # Duplicate emails are allowed, until the email is confirmed
+ assert addr3 is not addr
+
+ assert addr2 is not addr
+ assert addr2
+ addr4 = M.EmailAddress.create('test@DOMAIN.NET')
+ assert addr4 is not addr2
+
+ assert addr is c.user.address_object('test_admin@domain.net')
+ c.user.claim_address('test@DOMAIN.NET')
+ assert 'test@domain.net' in c.user.email_addresses
+
+
+@with_setup(setUp)
+def test_email_address_lookup_helpers():
+ addr = M.EmailAddress.create('TEST@DOMAIN.NET')
+ nobody = M.EmailAddress.create('nobody@example.com')
+ ThreadLocalORMSession.flush_all()
+ assert_equal(addr.email, 'TEST@domain.net')
+
+ assert_equal(M.EmailAddress.get(email='TEST@DOMAIN.NET'), addr)
+ assert_equal(M.EmailAddress.get(email='TEST@domain.net'), addr)
+ assert_equal(M.EmailAddress.get(email='test@domain.net'), None)
+ assert_equal(M.EmailAddress.get(email=None), None)
+ assert_equal(M.EmailAddress.get(email='nobody@example.com'), nobody)
+ # invalid email returns None, but not nobody@example.com as before
+ assert_equal(M.EmailAddress.get(email='invalid'), None)
+
+ assert_equal(M.EmailAddress.find(dict(email='TEST@DOMAIN.NET')).all(), [addr])
+ assert_equal(M.EmailAddress.find(dict(email='TEST@domain.net')).all(), [addr])
+ assert_equal(M.EmailAddress.find(dict(email='test@domain.net')).all(), [])
+ assert_equal(M.EmailAddress.find(dict(email=None)).all(), [])
+ assert_equal(M.EmailAddress.find(dict(email='nobody@example.com')).all(), [nobody])
+ # invalid email returns empty query, but not nobody@example.com as before
+ assert_equal(M.EmailAddress.find(dict(email='invalid')).all(), [])
+
+
+@with_setup(setUp)
+def test_email_address_canonical():
+ assert_equal(M.EmailAddress.canonical('nobody@EXAMPLE.COM'),
+ 'nobody@example.com')
+ assert_equal(M.EmailAddress.canonical('nobody@example.com'),
+ 'nobody@example.com')
+ assert_equal(M.EmailAddress.canonical('I Am Nobody <no...@example.com>'),
+ 'nobody@example.com')
+ assert_equal(M.EmailAddress.canonical(' nobody@example.com\t'),
+ 'nobody@example.com')
+ assert_equal(M.EmailAddress.canonical('I Am@Nobody <no...@example.com> '),
+ 'nobody@example.com')
+ assert_equal(M.EmailAddress.canonical(' No@body <no...@example.com> '),
+ 'no@body@example.com')
+ assert_equal(M.EmailAddress.canonical('no@body@example.com'),
+ 'no@body@example.com')
+ assert_equal(M.EmailAddress.canonical('invalid'), None)
+
+@with_setup(setUp)
+def test_email_address_send_verification_link():
+ addr = M.EmailAddress(email='test_admin@domain.net',
+ claimed_by_user_id=c.user._id)
+
+ addr.send_verification_link()
+
+ with patch('allura.tasks.mail_tasks.smtp_client._client') as _client:
+ M.MonQTask.run_ready()
+ return_path, rcpts, body = _client.sendmail.call_args[0]
+ assert_equal(rcpts, ['test_admin@domain.net'])
+
+
+@td.with_user_project('test-admin')
+@with_setup(setUp)
+def test_user():
+ assert c.user.url() .endswith('/u/test-admin/')
+ assert c.user.script_name .endswith('/u/test-admin/')
+ assert_equal(set(p.shortname for p in c.user.my_projects()),
+ set(['test', 'test2', 'u/test-admin', 'adobe-1', '--init--']))
+ # delete one of the projects and make sure it won't appear in my_projects()
+ p = M.Project.query.get(shortname='test2')
+ p.deleted = True
+ ThreadLocalORMSession.flush_all()
+ assert_equal(set(p.shortname for p in c.user.my_projects()),
+ set(['test', 'u/test-admin', 'adobe-1', '--init--']))
+ u = M.User.register(dict(
+ username='nosetest-user'))
+ ThreadLocalORMSession.flush_all()
+ assert_equal(u.private_project().shortname, 'u/nosetest-user')
+ roles = g.credentials.user_roles(
+ u._id, project_id=u.private_project().root_project._id)
+ assert len(roles) == 3, roles
+ u.set_password('foo')
+ provider = plugin.LocalAuthenticationProvider(Request.blank('/'))
+ assert provider._validate_password(u, 'foo')
+ assert not provider._validate_password(u, 'foobar')
+ u.set_password('foobar')
+ assert provider._validate_password(u, 'foobar')
+ assert not provider._validate_password(u, 'foo')
+
+
+@with_setup(setUp)
+def test_user_project_creates_on_demand():
+ u = M.User.register(dict(username='foobar123'), make_project=False)
+ ThreadLocalORMSession.flush_all()
+ assert not M.Project.query.get(shortname='u/foobar123')
+ assert u.private_project()
+ assert M.Project.query.get(shortname='u/foobar123')
+
+
+@with_setup(setUp)
+def test_user_project_already_deleted_creates_on_demand():
+ u = M.User.register(dict(username='foobar123'), make_project=True)
+ p = M.Project.query.get(shortname='u/foobar123')
+ p.deleted = True
+ ThreadLocalORMSession.flush_all()
+ assert not M.Project.query.get(shortname='u/foobar123', deleted=False)
+ assert u.private_project()
+ ThreadLocalORMSession.flush_all()
+ assert M.Project.query.get(shortname='u/foobar123', deleted=False)
+
+
+@with_setup(setUp)
+def test_user_project_does_not_create_on_demand_for_disabled_user():
+ u = M.User.register(
+ dict(username='foobar123', disabled=True), make_project=False)
+ ThreadLocalORMSession.flush_all()
+ assert not u.private_project()
+ assert not M.Project.query.get(shortname='u/foobar123')
+
+
+@with_setup(setUp)
+def test_user_project_does_not_create_on_demand_for_anonymous_user():
+ u = M.User.anonymous()
+ ThreadLocalORMSession.flush_all()
+ assert not u.private_project()
+ assert not M.Project.query.get(shortname='u/anonymous')
+ assert not M.Project.query.get(shortname='u/*anonymous')
+
+
+@with_setup(setUp)
+@patch('allura.model.auth.log')
+def test_user_by_email_address(log):
+ u1 = M.User.register(dict(username='abc1'), make_project=False)
+ u2 = M.User.register(dict(username='abc2'), make_project=False)
+ addr1 = M.EmailAddress(email='abc123@abc.me', confirmed=True,
+ claimed_by_user_id=u1._id)
+ addr2 = M.EmailAddress(email='abc123@abc.me', confirmed=True,
+ claimed_by_user_id=u2._id)
+ # both users are disabled
+ u1.disabled, u2.disabled = True, True
+ ThreadLocalORMSession.flush_all()
+ assert_equal(M.User.by_email_address('abc123@abc.me'), None)
+ assert_equal(log.warn.call_count, 0)
+
+ # only u2 is active
+ u1.disabled, u2.disabled = True, False
+ ThreadLocalORMSession.flush_all()
+ assert_equal(M.User.by_email_address('abc123@abc.me'), u2)
+ assert_equal(log.warn.call_count, 0)
+
+ # both are active
+ u1.disabled, u2.disabled = False, False
+ ThreadLocalORMSession.flush_all()
+ assert_in(M.User.by_email_address('abc123@abc.me'), [u1, u2])
+ assert_equal(log.warn.call_count, 1)
+
+ # invalid email returns None, but not user which claimed
+ # nobody@example.com as before
+ nobody = M.EmailAddress(email='nobody@example.com', confirmed=True,
+ claimed_by_user_id=u1._id)
+ ThreadLocalORMSession.flush_all()
+ assert_equal(M.User.by_email_address('nobody@example.com'), u1)
+ assert_equal(M.User.by_email_address('invalid'), None)
+
+
+@with_setup(setUp)
+def test_project_role():
+ role = M.ProjectRole(project_id=c.project._id, name='test_role')
+ M.ProjectRole.by_user(c.user, upsert=True).roles.append(role._id)
+ ThreadLocalORMSession.flush_all()
+ roles = g.credentials.user_roles(
+ c.user._id, project_id=c.project.root_project._id)
+ roles_ids = [r['_id'] for r in roles]
+ roles = M.ProjectRole.query.find({'_id': {'$in': roles_ids}})
+ for pr in roles:
+ assert pr.display()
+ pr.special
+ assert pr.user in (c.user, None, M.User.anonymous())
+
+
+@with_setup(setUp)
+def test_default_project_roles():
+ roles = dict(
+ (pr.name, pr)
+ for pr in M.ProjectRole.query.find(dict(
+ project_id=c.project._id)).all()
+ if pr.name)
+ assert 'Admin' in list(roles.keys()), list(roles.keys())
+ assert 'Developer' in list(roles.keys()), list(roles.keys())
+ assert 'Member' in list(roles.keys()), list(roles.keys())
+ assert roles['Developer']._id in roles['Admin'].roles
+ assert roles['Member']._id in roles['Developer'].roles
+
+ # There're 1 user assigned to project, represented by
+ # relational (vs named) ProjectRole's
+ assert len(roles) == M.ProjectRole.query.find(dict(
+ project_id=c.project._id)).count() - 1
+
+
+@with_setup(setUp)
+def test_email_address_claimed_by_user():
+ addr = M.EmailAddress(email='test_admin@domain.net',
+ claimed_by_user_id=c.user._id)
+ c.user.disabled = True
+ ThreadLocalORMSession.flush_all()
+ assert addr.claimed_by_user() is None
+
+
+@td.with_user_project('test-admin')
+@with_setup(setUp)
+def test_user_projects_by_role():
+ assert_equal(set(p.shortname for p in c.user.my_projects()),
+ set(['test', 'test2', 'u/test-admin', 'adobe-1', '--init--']))
+ assert_equal(set(p.shortname for p in c.user.my_projects_by_role_name('Admin')),
+ set(['test', 'test2', 'u/test-admin', 'adobe-1', '--init--']))
+ # Remove admin access from c.user to test2 project
+ project = M.Project.query.get(shortname='test2')
+ admin_role = M.ProjectRole.by_name('Admin', project)
+ developer_role = M.ProjectRole.by_name('Developer', project)
+ user_role = M.ProjectRole.by_user(c.user, project=project, upsert=True)
+ user_role.roles.remove(admin_role._id)
+ user_role.roles.append(developer_role._id)
+ ThreadLocalORMSession.flush_all()
+ g.credentials.clear()
+ assert_equal(set(p.shortname for p in c.user.my_projects()),
+ set(['test', 'test2', 'u/test-admin', 'adobe-1', '--init--']))
+ assert_equal(set(p.shortname for p in c.user.my_projects_by_role_name('Admin')),
+ set(['test', 'u/test-admin', 'adobe-1', '--init--']))
+
+
+@td.with_user_project('test-admin')
+@with_setup(setUp)
+def test_user_projects_unnamed():
+ """
+ Confirm that spurious ProjectRoles associating a user with
+ a project to which they do not belong to any named group
+ don't cause the user to count as a member of the project.
+ """
+ sub1 = M.Project.query.get(shortname='test/sub1')
+ M.ProjectRole(
+ user_id=c.user._id,
+ project_id=sub1._id)
+ ThreadLocalORMSession.flush_all()
+ project_names = [p.shortname for p in c.user.my_projects()]
+ assert_not_in('test/sub1', project_names)
+ assert_in('test', project_names)
+
+
+@patch.object(g, 'user_message_max_messages', 3)
+def test_check_sent_user_message_times():
+ user1 = M.User.register(dict(username='test-user'), make_project=False)
+ time1 = datetime.utcnow() - timedelta(minutes=30)
+ time2 = datetime.utcnow() - timedelta(minutes=45)
+ time3 = datetime.utcnow() - timedelta(minutes=70)
+ user1.sent_user_message_times = [time1, time2, time3]
+ assert user1.can_send_user_message()
+ assert_equal(len(user1.sent_user_message_times), 2)
+ user1.sent_user_message_times.append(
+ datetime.utcnow() - timedelta(minutes=15))
+ assert not user1.can_send_user_message()
+
+
+@td.with_user_project('test-admin')
+@with_setup(setUp)
+def test_user_track_active():
+ # without this session flushing inside track_active raises Exception
+ setup_functional_test()
+ c.user = M.User.by_username('test-admin')
+
+ assert_equal(c.user.last_access['session_date'], None)
+ assert_equal(c.user.last_access['session_ip'], None)
+ assert_equal(c.user.last_access['session_ua'], None)
+
+ req = Mock(headers={'User-Agent': 'browser'}, remote_addr='addr')
+ c.user.track_active(req)
+ c.user = M.User.by_username(c.user.username)
+ assert_not_equal(c.user.last_access['session_date'], None)
+ assert_equal(c.user.last_access['session_ip'], 'addr')
+ assert_equal(c.user.last_access['session_ua'], 'browser')
+
+ # ensure that session activity tracked with a whole-day granularity
+ prev_date = c.user.last_access['session_date']
+ c.user.track_active(req)
+ c.user = M.User.by_username(c.user.username)
+ assert_equal(c.user.last_access['session_date'], prev_date)
+ yesterday = datetime.utcnow() - timedelta(1)
+ c.user.last_access['session_date'] = yesterday
+ session(c.user).flush(c.user)
+ c.user.track_active(req)
+ c.user = M.User.by_username(c.user.username)
+ assert_true(c.user.last_access['session_date'] > yesterday)
+
+ # ...or if IP or User Agent has changed
+ req.remote_addr = 'new addr'
+ c.user.track_active(req)
+ c.user = M.User.by_username(c.user.username)
+ assert_equal(c.user.last_access['session_ip'], 'new addr')
+ assert_equal(c.user.last_access['session_ua'], 'browser')
+ req.headers['User-Agent'] = 'new browser'
+ c.user.track_active(req)
+ c.user = M.User.by_username(c.user.username)
+ assert_equal(c.user.last_access['session_ip'], 'new addr')
+ assert_equal(c.user.last_access['session_ua'], 'new browser')
+
+
+@with_setup(setUp)
+def test_user_index():
+ c.user.email_addresses = ['email1', 'email2']
+ c.user.set_pref('email_address', 'email2')
+ idx = c.user.index()
+ assert_equal(idx['id'], c.user.index_id())
+ assert_equal(idx['title'], 'User test-admin')
+ assert_equal(idx['type_s'], 'User')
+ assert_equal(idx['username_s'], 'test-admin')
+ assert_equal(idx['email_addresses_t'], 'email1 email2')
+ assert_equal(idx['email_address_s'], 'email2')
+ assert_in('last_password_updated_dt', idx)
+ assert_equal(idx['disabled_b'], False)
+ assert_in('results_per_page_i', idx)
+ assert_in('email_format_s', idx)
+ assert_in('disable_user_messages_b', idx)
+ assert_equal(idx['display_name_t'], 'Test Admin')
+ assert_equal(idx['sex_s'], 'Unknown')
+ assert_in('birthdate_dt', idx)
+ assert_in('localization_s', idx)
+ assert_in('timezone_s', idx)
+ assert_in('socialnetworks_t', idx)
+ assert_in('telnumbers_t', idx)
+ assert_in('skypeaccount_s', idx)
+ assert_in('webpages_t', idx)
+ assert_in('skills_t', idx)
+ assert_in('last_access_login_date_dt', idx)
+ assert_in('last_access_login_ip_s', idx)
+ assert_in('last_access_login_ua_t', idx)
+ assert_in('last_access_session_date_dt', idx)
+ assert_in('last_access_session_ip_s', idx)
+ assert_in('last_access_session_ua_t', idx)
+ # provided bby auth provider
+ assert_in('user_registration_date_dt', idx)
+
+@with_setup(setUp)
+def test_user_index_none_values():
+ c.user.email_addresses = [None]
+ c.user.set_pref('telnumbers', [None])
+ c.user.set_pref('webpages', [None])
+ idx = c.user.index()
+ assert_equal(idx['email_addresses_t'], '')
+ assert_equal(idx['telnumbers_t'], '')
+ assert_equal(idx['webpages_t'], '')
http://git-wip-us.apache.org/repos/asf/allura/blob/d52f8e2a/tests/model/test_discussion.py
----------------------------------------------------------------------
diff --git a/tests/model/test_discussion.py b/tests/model/test_discussion.py
new file mode 100644
index 0000000..ab9c610
--- /dev/null
+++ b/tests/model/test_discussion.py
@@ -0,0 +1,519 @@
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Model tests for artifact
+"""
+from __future__ import print_function
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import unicode_literals
+from io import StringIO
+import time
+from datetime import datetime, timedelta
+from cgi import FieldStorage
+
+from pylons import tmpl_context as c
+from nose.tools import assert_equals, with_setup
+import mock
+from mock import patch
+from nose.tools import assert_equal
+
+from ming.orm import session, ThreadLocalORMSession
+from webob import exc
+
+from allura import model as M
+from allura.lib import helpers as h
+from allura.tests import TestController
+from alluratest.controller import setup_global_objects
+
+
+def setUp():
+ controller = TestController()
+ controller.setUp()
+ controller.app.get('/wiki/Home/')
+ setup_global_objects()
+ ThreadLocalORMSession.close_all()
+ h.set_context('test', 'wiki', neighborhood='Projects')
+ ThreadLocalORMSession.flush_all()
+ ThreadLocalORMSession.close_all()
+
+
+def tearDown():
+ ThreadLocalORMSession.close_all()
+
+
+@with_setup(setUp, tearDown)
+def test_discussion_methods():
+ d = M.Discussion(shortname='test', name='test')
+ assert d.thread_class() == M.Thread
+ assert d.post_class() == M.Post
+ assert d.attachment_class() == M.DiscussionAttachment
+ ThreadLocalORMSession.flush_all()
+ d.update_stats()
+ ThreadLocalORMSession.flush_all()
+ assert d.last_post == None
+ assert d.url().endswith('wiki/_discuss/')
+ assert d.index()['name_s'] == 'test'
+ assert d.subscription() == None
+ assert d.find_posts().count() == 0
+ jsn = d.__json__()
+ assert jsn['name'] == d.name
+ d.delete()
+ ThreadLocalORMSession.flush_all()
+ ThreadLocalORMSession.close_all()
+
+
+@with_setup(setUp, tearDown)
+def test_thread_methods():
+ d = M.Discussion(shortname='test', name='test')
+ t = M.Thread.new(discussion_id=d._id, subject='Test Thread')
+ assert t.discussion_class() == M.Discussion
+ assert t.post_class() == M.Post
+ assert t.attachment_class() == M.DiscussionAttachment
+ p0 = t.post('This is a post')
+ p1 = t.post('This is another post')
+ time.sleep(0.25)
+ t.post('This is a reply', parent_id=p0._id)
+ ThreadLocalORMSession.flush_all()
+ ThreadLocalORMSession.close_all()
+ d = M.Discussion.query.get(shortname='test')
+ t = d.threads[0]
+ assert d.last_post is not None
+ assert t.last_post is not None
+ t.create_post_threads(t.posts)
+ posts0 = t.find_posts(page=0, limit=10, style='threaded')
+ posts1 = t.find_posts(page=0, limit=10, style='timestamp')
+ assert posts0 != posts1
+ ts = p0.timestamp.replace(
+ microsecond=int(p0.timestamp.microsecond // 1000) * 1000)
+ posts2 = t.find_posts(page=0, limit=10, style='threaded', timestamp=ts)
+ assert len(posts2) > 0
+
+ assert 'wiki/_discuss/' in t.url()
+ assert t.index()['views_i'] == 0
+ assert not t.subscription
+ t.subscription = True
+ assert t.subscription
+ t.subscription = False
+ assert not t.subscription
+ assert t.post_count == 3
+ jsn = t.__json__()
+ assert '_id' in jsn
+ assert_equals(len(jsn['posts']), 3)
+ (p.approve() for p in (p0, p1))
+ assert t.num_replies == 2
+ t.spam()
+ assert t.num_replies == 0
+ ThreadLocalORMSession.flush_all()
+ assert len(t.find_posts()) == 0
+ t.delete()
+
+
+@with_setup(setUp, tearDown)
+def test_thread_new():
+ with mock.patch('allura.model.discuss.h.nonce') as nonce:
+ nonce.side_effect = ['deadbeef', 'deadbeef', 'beefdead']
+ d = M.Discussion(shortname='test', name='test')
+ t1 = M.Thread.new(discussion_id=d._id, subject='Test Thread One')
+ t2 = M.Thread.new(discussion_id=d._id, subject='Test Thread Two')
+ ThreadLocalORMSession.flush_all()
+ session(t1).expunge(t1)
+ session(t2).expunge(t2)
+ t1_2 = M.Thread.query.get(_id=t1._id)
+ t2_2 = M.Thread.query.get(_id=t2._id)
+ assert_equals(t1._id, 'deadbeef')
+ assert_equals(t2._id, 'beefdead')
+ assert_equals(t1_2.subject, 'Test Thread One')
+ assert_equals(t2_2.subject, 'Test Thread Two')
+
+
+@with_setup(setUp, tearDown)
+def test_post_methods():
+ d = M.Discussion(shortname='test', name='test')
+ t = M.Thread.new(discussion_id=d._id, subject='Test Thread')
+ p = t.post('This is a post')
+ p2 = t.post('This is another post')
+ assert p.discussion_class() == M.Discussion
+ assert p.thread_class() == M.Thread
+ assert p.attachment_class() == M.DiscussionAttachment
+ p.commit()
+ assert p.parent is None
+ assert p.subject == 'Test Thread'
+ assert_equals(p.attachments, [])
+ assert 'wiki/_discuss' in p.url()
+ assert p.reply_subject() == 'Re: Test Thread'
+ assert p.link_text() == p.subject
+
+ ss = p.history().first()
+ assert 'version' in h.get_first(ss.index(), 'title')
+ assert '#' in ss.shorthand_id()
+
+ jsn = p.__json__()
+ assert jsn["thread_id"] == t._id
+
+ (p.approve() for p in (p, p2))
+ assert t.num_replies == 1
+ p2.spam()
+ assert t.num_replies == 0
+ p.spam()
+ assert t.num_replies == 0
+ p.delete()
+ assert t.num_replies == 0
+
+
+@with_setup(setUp, tearDown)
+def test_attachment_methods():
+ d = M.Discussion(shortname='test', name='test')
+ t = M.Thread.new(discussion_id=d._id, subject='Test Thread')
+ p = t.post('This is a post')
+ p_att = p.attach('foo.text', StringIO('Hello, world!'),
+ discussion_id=d._id,
+ thread_id=t._id,
+ post_id=p._id)
+ t_att = p.attach('foo2.text', StringIO('Hello, thread!'),
+ discussion_id=d._id,
+ thread_id=t._id)
+ d_att = p.attach('foo3.text', StringIO('Hello, discussion!'),
+ discussion_id=d._id)
+
+ ThreadLocalORMSession.flush_all()
+ assert p_att.post == p
+ assert p_att.thread == t
+ assert p_att.discussion == d
+ for att in (p_att, t_att, d_att):
+ assert 'wiki/_discuss' in att.url()
+ assert 'attachment/' in att.url()
+
+ # Test notification in mail
+ t = M.Thread.new(discussion_id=d._id, subject='Test comment notification')
+ fs = FieldStorage()
+ fs.name = 'file_info'
+ fs.filename = 'fake.txt'
+ fs.type = 'text/plain'
+ fs.file = StringIO('this is the content of the fake file\n')
+ p = t.post(text='test message', forum=None, subject='', file_info=fs)
+ ThreadLocalORMSession.flush_all()
+ n = M.Notification.query.get(
+ subject='[test:wiki] Test comment notification')
+ assert '\nAttachment: fake.txt (37 Bytes; text/plain)' in n.text
+
+
+@with_setup(setUp, tearDown())
+def test_multiple_attachments():
+ test_file1 = FieldStorage()
+ test_file1.name = 'file_info'
+ test_file1.filename = 'test1.txt'
+ test_file1.type = 'text/plain'
+ test_file1.file = StringIO('test file1\n')
+ test_file2 = FieldStorage()
+ test_file2.name = 'file_info'
+ test_file2.filename = 'test2.txt'
+ test_file2.type = 'text/plain'
+ test_file2.file = StringIO('test file2\n')
+ d = M.Discussion(shortname='test', name='test')
+ t = M.Thread.new(discussion_id=d._id, subject='Test Thread')
+ test_post = t.post('test post')
+ test_post.add_multiple_attachments([test_file1, test_file2])
+ ThreadLocalORMSession.flush_all()
+ assert_equals(len(test_post.attachments), 2)
+ attaches = test_post.attachments
+ assert 'test1.txt' in [attaches[0].filename, attaches[1].filename]
+ assert 'test2.txt' in [attaches[0].filename, attaches[1].filename]
+
+
+@with_setup(setUp, tearDown)
+def test_add_attachment():
+ test_file = FieldStorage()
+ test_file.name = 'file_info'
+ test_file.filename = 'test.txt'
+ test_file.type = 'text/plain'
+ test_file.file = StringIO('test file\n')
+ d = M.Discussion(shortname='test', name='test')
+ t = M.Thread.new(discussion_id=d._id, subject='Test Thread')
+ test_post = t.post('test post')
+ test_post.add_attachment(test_file)
+ ThreadLocalORMSession.flush_all()
+ assert_equals(len(test_post.attachments), 1)
+ attach = test_post.attachments[0]
+ assert attach.filename == 'test.txt', attach.filename
+ assert attach.content_type == 'text/plain', attach.content_type
+
+
+def test_notification_two_attaches():
+ d = M.Discussion(shortname='test', name='test')
+ t = M.Thread.new(discussion_id=d._id, subject='Test comment notification')
+ fs1 = FieldStorage()
+ fs1.name = 'file_info'
+ fs1.filename = 'fake.txt'
+ fs1.type = 'text/plain'
+ fs1.file = StringIO('this is the content of the fake file\n')
+ fs2 = FieldStorage()
+ fs2.name = 'file_info'
+ fs2.filename = 'fake2.txt'
+ fs2.type = 'text/plain'
+ fs2.file = StringIO('this is the content of the fake file\n')
+ t.post(text='test message', forum=None, subject='', file_info=[fs1, fs2])
+ ThreadLocalORMSession.flush_all()
+ n = M.Notification.query.get(
+ subject='[test:wiki] Test comment notification')
+ assert '\nAttachment: fake.txt (37 Bytes; text/plain) fake2.txt (37 Bytes; text/plain)' in n.text
+
+
+@with_setup(setUp, tearDown)
+def test_discussion_delete():
+ d = M.Discussion(shortname='test', name='test')
+ t = M.Thread.new(discussion_id=d._id, subject='Test Thread')
+ p = t.post('This is a post')
+ p.attach('foo.text', StringIO(''),
+ discussion_id=d._id,
+ thread_id=t._id,
+ post_id=p._id)
+ M.ArtifactReference.from_artifact(d)
+ rid = d.index_id()
+ ThreadLocalORMSession.flush_all()
+ d.delete()
+ ThreadLocalORMSession.flush_all()
+ assert_equals(M.ArtifactReference.query.find(dict(_id=rid)).count(), 0)
+
+
+@with_setup(setUp, tearDown)
+def test_thread_delete():
+ d = M.Discussion(shortname='test', name='test')
+ t = M.Thread.new(discussion_id=d._id, subject='Test Thread')
+ p = t.post('This is a post')
+ p.attach('foo.text', StringIO(''),
+ discussion_id=d._id,
+ thread_id=t._id,
+ post_id=p._id)
+ ThreadLocalORMSession.flush_all()
+ t.delete()
+
+
+@with_setup(setUp, tearDown)
+def test_post_delete():
+ d = M.Discussion(shortname='test', name='test')
+ t = M.Thread.new(discussion_id=d._id, subject='Test Thread')
+ p = t.post('This is a post')
+ p.attach('foo.text', StringIO(''),
+ discussion_id=d._id,
+ thread_id=t._id,
+ post_id=p._id)
+ ThreadLocalORMSession.flush_all()
+ p.delete()
+
+
+@with_setup(setUp, tearDown)
+def test_post_permission_check():
+ d = M.Discussion(shortname='test', name='test')
+ t = M.Thread.new(discussion_id=d._id, subject='Test Thread')
+ c.user = M.User.anonymous()
+ try:
+ t.post('This post will fail the check.')
+ assert False, "Expected an anonymous post to fail."
+ except exc.HTTPUnauthorized:
+ pass
+ t.post('This post will pass the check.', ignore_security=True)
+
+
+@with_setup(setUp, tearDown)
+def test_post_url_paginated():
+ d = M.Discussion(shortname='test', name='test')
+ t = M.Thread(discussion_id=d._id, subject='Test Thread')
+ p = [] # posts in display order
+ ts = datetime.utcnow() - timedelta(days=1)
+ for i in range(5):
+ ts += timedelta(minutes=1)
+ p.append(t.post('This is a post #%s' % i, timestamp=ts))
+
+ ts += timedelta(minutes=1)
+ p.insert(1, t.post(
+ 'This is reply #0 to post #0', parent_id=p[0]._id, timestamp=ts))
+
+ ts += timedelta(minutes=1)
+ p.insert(2, t.post(
+ 'This is reply #1 to post #0', parent_id=p[0]._id, timestamp=ts))
+
+ ts += timedelta(minutes=1)
+ p.insert(4, t.post(
+ 'This is reply #0 to post #1', parent_id=p[3]._id, timestamp=ts))
+
+ ts += timedelta(minutes=1)
+ p.insert(6, t.post(
+ 'This is reply #0 to post #2', parent_id=p[5]._id, timestamp=ts))
+
+ ts += timedelta(minutes=1)
+ p.insert(7, t.post(
+ 'This is reply #1 to post #2', parent_id=p[5]._id, timestamp=ts))
+
+ ts += timedelta(minutes=1)
+ p.insert(8, t.post(
+ 'This is reply #0 to reply #1 to post #2',
+ parent_id=p[7]._id, timestamp=ts))
+
+ # with default paging limit
+ for _p in p:
+ url = t.url() + '?limit=25#' + _p.slug
+ assert _p.url_paginated() == url, _p.url_paginated()
+
+ # with user paging limit
+ limit = 3
+ c.user.set_pref('results_per_page', limit)
+ for i, _p in enumerate(p):
+ page = i / limit
+ url = t.url() + '?limit=%s' % limit
+ if page > 0:
+ url += '&page=%s' % page
+ url += '#' + _p.slug
+ assert _p.url_paginated() == url, _p.url_paginated()
+
+
+@with_setup(setUp, tearDown)
+def test_post_url_paginated_with_artifact():
+ """Post.url_paginated should return link to attached artifact, if any"""
+ from forgewiki.model import Page
+ page = Page.upsert(title='Test Page')
+ thread = page.discussion_thread
+ comment = thread.post('Comment')
+ url = page.url() + '?limit=25#' + comment.slug
+ assert_equals(comment.url_paginated(), url)
+
+
+@with_setup(setUp, tearDown)
+def test_post_notify():
+ d = M.Discussion(shortname='test', name='test')
+ d.monitoring_email = 'darthvader@deathstar.org'
+ t = M.Thread.new(discussion_id=d._id, subject='Test Thread')
+ with patch('allura.model.notification.Notification.send_simple') as send:
+ t.post('This is a post')
+ send.assert_called_with(d.monitoring_email)
+
+ c.app.config.project.notifications_disabled = True
+ with patch('allura.model.notification.Notification.send_simple') as send:
+ t.post('Another post')
+ try:
+ send.assert_called_with(d.monitoring_email)
+ except AssertionError:
+ pass # method not called as expected
+ else:
+ assert False, 'send_simple must not be called'
+
+
+@with_setup(setUp, tearDown)
+@patch('allura.model.discuss.c.project.users_with_role')
+def test_is_spam_for_admin(users):
+ users.return_value = [c.user, ]
+ d = M.Discussion(shortname='test', name='test')
+ t = M.Thread(discussion_id=d._id, subject='Test Thread')
+ t.post('This is a post')
+ post = M.Post.query.get(text='This is a post')
+ assert not t.is_spam(post), t.is_spam(post)
+
+
+@with_setup(setUp, tearDown)
+@patch('allura.model.discuss.c.project.users_with_role')
+def test_is_spam(role):
+ d = M.Discussion(shortname='test', name='test')
+ t = M.Thread(discussion_id=d._id, subject='Test Thread')
+ role.return_value = []
+ with mock.patch('allura.controllers.discuss.g.spam_checker') as spam_checker:
+ spam_checker.check.return_value = True
+ post = mock.Mock()
+ assert t.is_spam(post), t.is_spam(post)
+ assert spam_checker.check.call_count == 1, spam_checker.call_count
+
+
+@with_setup(setUp, tearDown)
+@mock.patch('allura.controllers.discuss.g.spam_checker')
+def test_not_spam_and_has_unmoderated_post_permission(spam_checker):
+ spam_checker.check.return_value = False
+ d = M.Discussion(shortname='test', name='test')
+ t = M.Thread(discussion_id=d._id, subject='Test Thread')
+ role = M.ProjectRole.by_name('*anonymous')._id
+ post_permission = M.ACE.allow(role, 'post')
+ unmoderated_post_permission = M.ACE.allow(role, 'unmoderated_post')
+ t.acl.append(post_permission)
+ t.acl.append(unmoderated_post_permission)
+ with h.push_config(c, user=M.User.anonymous()):
+ post = t.post('Hey')
+ assert_equal(post.status, 'ok')
+
+
+@with_setup(setUp, tearDown)
+@mock.patch('allura.controllers.discuss.g.spam_checker')
+@mock.patch.object(M.Thread, 'notify_moderators')
+def test_not_spam_but_has_no_unmoderated_post_permission(spam_checker, notify_moderators):
+ spam_checker.check.return_value = False
+ d = M.Discussion(shortname='test', name='test')
+ t = M.Thread(discussion_id=d._id, subject='Test Thread')
+ role = M.ProjectRole.by_name('*anonymous')._id
+ post_permission = M.ACE.allow(role, 'post')
+ t.acl.append(post_permission)
+ with h.push_config(c, user=M.User.anonymous()):
+ post = t.post('Hey')
+ assert_equal(post.status, 'pending')
+ notify_moderators.assert_called_once()
+
+
+@with_setup(setUp, tearDown)
+@mock.patch('allura.controllers.discuss.g.spam_checker')
+@mock.patch.object(M.Thread, 'notify_moderators')
+def test_spam_and_has_unmoderated_post_permission(spam_checker, notify_moderators):
+ spam_checker.check.return_value = True
+ d = M.Discussion(shortname='test', name='test')
+ t = M.Thread(discussion_id=d._id, subject='Test Thread')
+ role = M.ProjectRole.by_name('*anonymous')._id
+ post_permission = M.ACE.allow(role, 'post')
+ unmoderated_post_permission = M.ACE.allow(role, 'unmoderated_post')
+ t.acl.append(post_permission)
+ t.acl.append(unmoderated_post_permission)
+ with h.push_config(c, user=M.User.anonymous()):
+ post = t.post('Hey')
+ assert_equal(post.status, 'pending')
+ notify_moderators.assert_called_once()
+
+
+@with_setup(setUp, tearDown)
+@mock.patch('allura.controllers.discuss.g.spam_checker')
+def test_thread_subject_not_included_in_text_checked(spam_checker):
+ spam_checker.check.return_value = False
+ d = M.Discussion(shortname='test', name='test')
+ t = M.Thread(discussion_id=d._id, subject='Test Thread')
+ t.post('Hello')
+ spam_checker.check.assert_called_once()
+ assert_equal(spam_checker.check.call_args[0][0], 'Hello')
+
+
+def test_post_count():
+ d = M.Discussion(shortname='test', name='test')
+ t = M.Thread(discussion_id=d._id, subject='Test Thread')
+ M.Post(discussion_id=d._id, thread_id=t._id, status='spam')
+ M.Post(discussion_id=d._id, thread_id=t._id, status='ok')
+ M.Post(discussion_id=d._id, thread_id=t._id, status='pending')
+ ThreadLocalORMSession.flush_all()
+ assert_equal(t.post_count, 2)
+
+
+@mock.patch('allura.controllers.discuss.g.spam_checker')
+def test_spam_num_replies(spam_checker):
+ d = M.Discussion(shortname='test', name='test')
+ t = M.Thread(discussion_id=d._id, subject='Test Thread', num_replies=2)
+ p1 = M.Post(discussion_id=d._id, thread_id=t._id, status='spam')
+ p1.spam()
+ assert_equal(t.num_replies, 1)
http://git-wip-us.apache.org/repos/asf/allura/blob/d52f8e2a/tests/model/test_filesystem.py
----------------------------------------------------------------------
diff --git a/tests/model/test_filesystem.py b/tests/model/test_filesystem.py
new file mode 100644
index 0000000..f9e2c9e
--- /dev/null
+++ b/tests/model/test_filesystem.py
@@ -0,0 +1,220 @@
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import print_function
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import unicode_literals
+import os
+from unittest import TestCase
+from io import StringIO
+from io import BytesIO
+
+from pylons import tmpl_context as c
+from ming.orm import session, Mapper
+from nose.tools import assert_equal
+from mock import patch
+from webob import Request, Response
+
+from allura import model as M
+from alluratest.controller import setup_unit_test
+
+
+class File(M.File):
+
+ class __mongometa__:
+ session = M.session.main_orm_session
+Mapper.compile_all()
+
+
+class TestFile(TestCase):
+
+ def setUp(self):
+ setup_unit_test()
+ self.session = session(File)
+ self.conn = M.session.main_doc_session.db._connection
+ self.db = M.session.main_doc_session.db
+
+ self.db.fs.remove()
+ self.db.fs.files.remove()
+ self.db.fs.chunks.remove()
+
+ def test_from_stream(self):
+ f = File.from_stream('test1.txt', StringIO('test1'))
+ self.session.flush()
+ assert self.db.fs.count() == 1
+ assert self.db.fs.files.count() == 1
+ assert self.db.fs.chunks.count() == 1
+ assert f.filename == 'test1.txt'
+ assert f.content_type == 'text/plain'
+ self._assert_content(f, 'test1')
+
+ def test_from_data(self):
+ f = File.from_data('test2.txt', 'test2')
+ self.session.flush(f)
+ assert self.db.fs.count() == 1
+ assert self.db.fs.files.count() == 1
+ assert self.db.fs.chunks.count() == 1
+ assert f.filename == 'test2.txt'
+ assert f.content_type == 'text/plain'
+ self._assert_content(f, 'test2')
+
+ def test_from_path(self):
+ path = __file__.rstrip('c')
+ f = File.from_path(path)
+ self.session.flush()
+ assert self.db.fs.count() == 1
+ assert self.db.fs.files.count() == 1
+ assert self.db.fs.chunks.count() >= 1
+ assert f.filename == os.path.basename(path)
+ text = f.rfile().read()
+ assert text.startswith('# -*-')
+
+ def test_delete(self):
+ f = File.from_data('test1.txt', 'test1')
+ self.session.flush()
+ assert self.db.fs.count() == 1
+ assert self.db.fs.files.count() == 1
+ assert self.db.fs.chunks.count() == 1
+ f.delete()
+ self.session.flush()
+ assert self.db.fs.count() == 0
+ assert self.db.fs.files.count() == 0
+ assert self.db.fs.chunks.count() == 0
+
+ def test_remove(self):
+ File.from_data('test1.txt', 'test1')
+ File.from_data('test2.txt', 'test2')
+ self.session.flush()
+ assert self.db.fs.count() == 2
+ assert self.db.fs.files.count() == 2
+ assert self.db.fs.chunks.count() == 2
+ File.remove(dict(filename='test1.txt'))
+ self.session.flush()
+ assert self.db.fs.count() == 1
+ assert self.db.fs.files.count() == 1
+ assert self.db.fs.chunks.count() == 1
+
+ def test_overwrite(self):
+ f = File.from_data('test1.txt', 'test1')
+ self.session.flush()
+ assert self.db.fs.count() == 1
+ assert self.db.fs.files.count() == 1
+ assert self.db.fs.chunks.count() == 1
+ self._assert_content(f, 'test1')
+ with f.wfile() as fp:
+ fp.write('test2')
+ self.session.flush()
+ assert self.db.fs.count() == 1
+ assert self.db.fs.files.count() == 2
+ assert self.db.fs.chunks.count() == 2
+ self._assert_content(f, 'test2')
+
+ def test_serve_embed(self):
+ f = File.from_data('te s\u0b6e1.txt', 'test1')
+ self.session.flush()
+ with patch('allura.lib.utils.tg.request', Request.blank('/')), \
+ patch('allura.lib.utils.pylons.response', Response()) as response, \
+ patch('allura.lib.utils.etag_cache') as etag_cache:
+ response_body = list(f.serve())
+ etag_cache.assert_called_once_with('{}?{}'.format(f.filename,
+ f._id.generation_time).encode('utf-8'))
+ assert_equal(['test1'], response_body)
+ assert_equal(response.content_type, f.content_type)
+ assert 'Content-Disposition' not in response.headers
+
+ def test_serve_embed_false(self):
+ f = File.from_data('te s\u0b6e1.txt', 'test1')
+ self.session.flush()
+ with patch('allura.lib.utils.tg.request', Request.blank('/')), \
+ patch('allura.lib.utils.pylons.response', Response()) as response, \
+ patch('allura.lib.utils.etag_cache') as etag_cache:
+ response_body = list(f.serve(embed=False))
+ etag_cache.assert_called_once_with('{}?{}'.format(f.filename,
+ f._id.generation_time).encode('utf-8'))
+ assert_equal(['test1'], response_body)
+ assert_equal(response.content_type, f.content_type)
+ assert_equal(response.headers['Content-Disposition'],
+ 'attachment;filename="te s\xe0\xad\xae1.txt"')
+
+ def test_image(self):
+ path = os.path.join(
+ os.path.dirname(__file__), '..', 'data', 'user.png')
+ with open(path) as fp:
+ f, t = File.save_image(
+ 'user.png',
+ fp,
+ thumbnail_size=(16, 16),
+ square=True,
+ save_original=True)
+ self.session.flush()
+ assert f.content_type == 'image/png'
+ assert f.is_image()
+ assert t.content_type == 'image/png'
+ assert t.is_image()
+ assert f.filename == t.filename
+ assert self.db.fs.count() == 2
+ assert self.db.fs.files.count() == 2
+ assert self.db.fs.chunks.count() == 2
+
+ def test_not_image(self):
+ f, t = File.save_image(
+ 'file.txt',
+ StringIO('blah'),
+ thumbnail_size=(16, 16),
+ square=True,
+ save_original=True)
+ assert f == None
+ assert t == None
+
+ def test_invalid_image(self):
+ f, t = File.save_image(
+ 'bogus.png',
+ StringIO('bogus data here!'),
+ thumbnail_size=(16, 16),
+ square=True,
+ save_original=True)
+ assert f == None
+ assert t == None
+
+ def test_partial_image_as_attachment(self):
+ path = os.path.join(os.path.dirname(__file__),
+ '..', 'data', 'user.png')
+ fp = BytesIO(open(path, 'rb').read(500))
+ c.app.config._id = None
+ attachment = M.BaseAttachment.save_attachment('user.png', fp,
+ save_original=True)
+ assert type(attachment) != tuple # tuple is for (img, thumb) pairs
+ assert_equal(attachment.length, 500)
+ assert_equal(attachment.filename, 'user.png')
+
+ def test_attachment_name_encoding(self):
+ path = os.path.join(os.path.dirname(__file__),
+ '..', 'data', 'user.png')
+ fp = open(path, 'rb')
+ c.app.config._id = None
+ attachment = M.BaseAttachment.save_attachment(
+ b'Strukturpr\xfcfung.dvi', fp,
+ save_original=True)
+ assert type(attachment) != tuple # tuple is for (img, thumb) pairs
+ assert_equal(attachment.filename, 'Strukturpr\xfcfung.dvi')
+
+ def _assert_content(self, f, content):
+ result = f.rfile().read()
+ assert result == content, result
http://git-wip-us.apache.org/repos/asf/allura/blob/d52f8e2a/tests/model/test_monq.py
----------------------------------------------------------------------
diff --git a/tests/model/test_monq.py b/tests/model/test_monq.py
new file mode 100644
index 0000000..055bb91
--- /dev/null
+++ b/tests/model/test_monq.py
@@ -0,0 +1,46 @@
+from __future__ import print_function
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import unicode_literals
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import pprint
+from nose.tools import with_setup
+
+from ming.orm import ThreadLocalORMSession
+
+from alluratest.controller import setup_basic_test, setup_global_objects
+from allura import model as M
+
+
+def setUp():
+ setup_basic_test()
+ ThreadLocalORMSession.close_all()
+ setup_global_objects()
+ M.MonQTask.query.remove({})
+
+
+@with_setup(setUp)
+def test_basic_task():
+ task = M.MonQTask.post(pprint.pformat, ([5, 6],))
+ ThreadLocalORMSession.flush_all()
+ ThreadLocalORMSession.close_all()
+ task = M.MonQTask.get()
+ assert task
+ task()
+ assert task.result == 'I[5, 6]', task.result
http://git-wip-us.apache.org/repos/asf/allura/blob/d52f8e2a/tests/model/test_neighborhood.py
----------------------------------------------------------------------
diff --git a/tests/model/test_neighborhood.py b/tests/model/test_neighborhood.py
new file mode 100644
index 0000000..0177c26
--- /dev/null
+++ b/tests/model/test_neighborhood.py
@@ -0,0 +1,97 @@
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Model tests for neighborhood
+"""
+from __future__ import print_function
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import unicode_literals
+from nose.tools import with_setup
+
+from allura import model as M
+from allura.tests import decorators as td
+from alluratest.controller import setup_basic_test, setup_global_objects
+
+
+def setUp():
+ setup_basic_test()
+ setup_with_tools()
+
+
+@td.with_wiki
+def setup_with_tools():
+ setup_global_objects()
+
+
+@with_setup(setUp)
+def test_neighborhood():
+ neighborhood = M.Neighborhood.query.get(name='Projects')
+ # Check css output depends of neighborhood level
+ test_css = ".text{color:#000;}"
+ neighborhood.css = test_css
+ neighborhood.features['css'] = 'none'
+ assert neighborhood.get_custom_css() == ""
+ neighborhood.features['css'] = 'picker'
+ assert neighborhood.get_custom_css() == test_css
+ neighborhood.features['css'] = 'custom'
+ assert neighborhood.get_custom_css() == test_css
+ # Check max projects
+ neighborhood.features['max_projects'] = None
+ assert neighborhood.get_max_projects() is None
+ neighborhood.features['max_projects'] = 500
+ assert neighborhood.get_max_projects() == 500
+
+ # Check picker css styles
+ test_css_dict = {'barontop': '#444',
+ 'titlebarbackground': '#555',
+ 'projecttitlefont': 'arial,sans-serif',
+ 'projecttitlecolor': '#333',
+ 'titlebarcolor': '#666',
+ 'addopt-icon-theme': 'dark'}
+ css_text = neighborhood.compile_css_for_picker(test_css_dict)
+ assert '#333' in css_text
+ assert '#444' in css_text
+ assert '#555' in css_text
+ assert '#666' in css_text
+ assert 'arial,sans-serif' in css_text
+ assert 'images/neo-icon-set-ffffff-256x350.png' in css_text
+ neighborhood.css = css_text
+ styles_list = neighborhood.get_css_for_picker()
+ for style in styles_list:
+ assert test_css_dict[style['name']] == style['value']
+ if style['name'] == 'titlebarcolor':
+ assert '<option value="dark" selected="selected">' in style[
+ 'additional']
+
+ # Check neighborhood custom css showing
+ neighborhood.features['css'] = 'none'
+ assert not neighborhood.allow_custom_css
+ neighborhood.features['css'] = 'picker'
+ assert neighborhood.allow_custom_css
+ neighborhood.features['css'] = 'custom'
+ assert neighborhood.allow_custom_css
+
+ neighborhood.anchored_tools = 'wiki:Wiki, tickets:Tickets'
+ assert neighborhood.get_anchored_tools()['wiki'] == 'Wiki'
+ assert neighborhood.get_anchored_tools()['tickets'] == 'Tickets'
+
+ neighborhood.prohibited_tools = 'wiki, tickets'
+ assert neighborhood.get_prohibited_tools() == ['wiki', 'tickets']
http://git-wip-us.apache.org/repos/asf/allura/blob/d52f8e2a/tests/model/test_notification.py
----------------------------------------------------------------------
diff --git a/tests/model/test_notification.py b/tests/model/test_notification.py
new file mode 100644
index 0000000..e5c1aca
--- /dev/null
+++ b/tests/model/test_notification.py
@@ -0,0 +1,487 @@
+from __future__ import print_function
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import unicode_literals
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from datetime import timedelta
+import collections
+
+from pylons import tmpl_context as c, app_globals as g
+from nose.tools import assert_equal, assert_in
+from ming.orm import ThreadLocalORMSession
+import mock
+import bson
+
+from alluratest.controller import setup_basic_test, setup_global_objects
+from allura import model as M
+from allura.model.notification import MailFooter
+from allura.lib import helpers as h
+from allura.tests import decorators as td
+from forgewiki import model as WM
+
+
+class TestNotification(unittest.TestCase):
+
+ def setUp(self):
+ setup_basic_test()
+ self.setup_with_tools()
+
+ @td.with_wiki
+ def setup_with_tools(self):
+ setup_global_objects()
+ _clear_subscriptions()
+ _clear_notifications()
+ ThreadLocalORMSession.flush_all()
+ ThreadLocalORMSession.close_all()
+ M.notification.MAILBOX_QUIESCENT = None # disable message combining
+
+ def test_subscribe_unsubscribe(self):
+ M.Mailbox.subscribe(type='direct')
+ ThreadLocalORMSession.flush_all()
+ ThreadLocalORMSession.close_all()
+ subscriptions = M.Mailbox.query.find(dict(
+ project_id=c.project._id,
+ app_config_id=c.app.config._id,
+ user_id=c.user._id)).all()
+ assert len(subscriptions) == 1
+ assert subscriptions[0].type == 'direct'
+ assert M.Mailbox.query.find().count() == 1
+ M.Mailbox.unsubscribe()
+ ThreadLocalORMSession.flush_all()
+ ThreadLocalORMSession.close_all()
+ subscriptions = M.Mailbox.query.find(dict(
+ project_id=c.project._id,
+ app_config_id=c.app.config._id,
+ user_id=c.user._id)).all()
+ assert len(subscriptions) == 0
+ assert M.Mailbox.query.find().count() == 0
+
+ @mock.patch('allura.tasks.mail_tasks.sendmail')
+ def test_send_direct(self, sendmail):
+ c.user = M.User.query.get(username='test-user')
+ wiki = c.project.app_instance('wiki')
+ page = WM.Page.query.get(app_config_id=wiki.config._id)
+ notification = M.Notification(
+ _id='_id',
+ ref=page.ref,
+ from_address='from_address',
+ reply_to_address='reply_to_address',
+ in_reply_to='in_reply_to',
+ references=['a'],
+ subject='subject',
+ text='text',
+ )
+ notification.footer = lambda: ' footer'
+ notification.send_direct(c.user._id)
+ sendmail.post.assert_called_once_with(
+ destinations=[str(c.user._id)],
+ fromaddr='from_address',
+ reply_to='reply_to_address',
+ subject='subject',
+ message_id='_id',
+ in_reply_to='in_reply_to',
+ references=['a'],
+ sender='wiki@test.p.in.localhost',
+ text='text footer',
+ )
+
+ @mock.patch('allura.tasks.mail_tasks.sendmail')
+ def test_send_direct_no_access(self, sendmail):
+ c.user = M.User.query.get(username='test-user')
+ wiki = c.project.app_instance('wiki')
+ page = WM.Page.query.get(app_config_id=wiki.config._id)
+ page.parent_security_context().acl = []
+ ThreadLocalORMSession.flush_all()
+ ThreadLocalORMSession.close_all()
+ notification = M.Notification(
+ _id='_id',
+ ref=page.ref,
+ from_address='from_address',
+ reply_to_address='reply_to_address',
+ in_reply_to='in_reply_to',
+ subject='subject',
+ text='text',
+ )
+ notification.footer = lambda: ' footer'
+ notification.send_direct(c.user._id)
+ assert_equal(sendmail.post.call_count, 0)
+
+ @mock.patch('allura.tasks.mail_tasks.sendmail')
+ def test_send_direct_wrong_project_context(self, sendmail):
+ """
+ Test that Notification.send_direct() works as expected even
+ if c.project is wrong.
+
+ This can happen when a notify task is triggered on project A (thus
+ setting c.project to A) and then calls Mailbox.fire_ready() which fires
+ pending Notifications on any waiting Mailbox, regardless of project,
+ but doesn't update c.project.
+ """
+ project1 = c.project
+ project2 = M.Project.query.get(shortname='test2')
+ assert_equal(project1.shortname, 'test')
+ c.user = M.User.query.get(username='test-user')
+ wiki = project1.app_instance('wiki')
+ page = WM.Page.query.get(app_config_id=wiki.config._id)
+ notification = M.Notification(
+ _id='_id',
+ ref=page.ref,
+ from_address='from_address',
+ reply_to_address='reply_to_address',
+ in_reply_to='in_reply_to',
+ references=['a'],
+ subject='subject',
+ text='text',
+ )
+ notification.footer = lambda: ' footer'
+ c.project = project2
+ notification.send_direct(c.user._id)
+ sendmail.post.assert_called_once_with(
+ destinations=[str(c.user._id)],
+ fromaddr='from_address',
+ reply_to='reply_to_address',
+ subject='subject',
+ message_id='_id',
+ in_reply_to='in_reply_to',
+ references=['a'],
+ sender='wiki@test.p.in.localhost',
+ text='text footer',
+ )
+
+
+class TestPostNotifications(unittest.TestCase):
+
+ def setUp(self):
+ setup_basic_test()
+ self.setup_with_tools()
+
+ @td.with_wiki
+ def setup_with_tools(self):
+ setup_global_objects()
+ g.set_app('wiki')
+ _clear_subscriptions()
+ _clear_notifications()
+ ThreadLocalORMSession.flush_all()
+ ThreadLocalORMSession.close_all()
+ self.pg = WM.Page.query.get(app_config_id=c.app.config._id)
+ M.notification.MAILBOX_QUIESCENT = None # disable message combining
+ while M.MonQTask.run_ready('setup'):
+ ThreadLocalORMSession.flush_all()
+
+ def test_post_notification(self):
+ self._post_notification()
+ ThreadLocalORMSession.flush_all()
+ M.MonQTask.list()
+ t = M.MonQTask.get()
+ assert t.args[1] == self.pg.index_id()
+
+ def test_post_user_notification(self):
+ u = M.User.query.get(username='test-admin')
+ M.Notification.post_user(u, self.pg, 'metadata')
+ ThreadLocalORMSession.flush_all()
+ ThreadLocalORMSession.close_all()
+ flash_msgs = list(h.pop_user_notifications(u))
+ assert len(flash_msgs) == 1, flash_msgs
+ msg = flash_msgs[0]
+ assert msg['text'].startswith('Home modified by Test Admin')
+ assert msg['subject'].startswith('[test:wiki]')
+ flash_msgs = list(h.pop_user_notifications(u))
+ assert not flash_msgs, flash_msgs
+
+ def test_delivery(self):
+ self._subscribe()
+ self._post_notification()
+ M.MonQTask.run_ready()
+ ThreadLocalORMSession.flush_all()
+ M.MonQTask.run_ready()
+ ThreadLocalORMSession.flush_all()
+ assert M.Mailbox.query.find().count() == 1
+ mbox = M.Mailbox.query.get()
+ assert len(mbox.queue) == 1
+ assert not mbox.queue_empty
+
+ def test_email(self):
+ self._subscribe() # as current user: test-admin
+ user2 = M.User.query.get(username='test-user-2')
+ self._subscribe(user=user2)
+ self._post_notification()
+ ThreadLocalORMSession.flush_all()
+
+ assert_equal(M.Notification.query.get()
+ ['from_address'], '"Test Admin" <te...@users.localhost>')
+ assert_equal(M.Mailbox.query.find().count(), 2)
+
+ # sends the notification out into "mailboxes", and from mailboxes into
+ # email tasks
+ M.MonQTask.run_ready()
+ mboxes = M.Mailbox.query.find().all()
+ assert_equal(len(mboxes), 2)
+ assert_equal(len(mboxes[0].queue), 1)
+ assert not mboxes[0].queue_empty
+ assert_equal(len(mboxes[1].queue), 1)
+ assert not mboxes[1].queue_empty
+
+ email_tasks = M.MonQTask.query.find({'state': 'ready'}).all()
+ # make sure both subscribers will get an email
+ assert_equal(len(email_tasks), 2)
+
+ first_destinations = [e.kwargs['destinations'][0] for e in email_tasks]
+ assert_in(str(c.user._id), first_destinations)
+ assert_in(str(user2._id), first_destinations)
+ assert_equal(email_tasks[0].kwargs['fromaddr'],
+ '"Test Admin" <te...@users.localhost>')
+ assert_equal(email_tasks[1].kwargs['fromaddr'],
+ '"Test Admin" <te...@users.localhost>')
+ assert_equal(email_tasks[0].kwargs['sender'],
+ 'wiki@test.p.in.localhost')
+ assert_equal(email_tasks[1].kwargs['sender'],
+ 'wiki@test.p.in.localhost')
+ assert email_tasks[0].kwargs['text'].startswith(
+ 'Home modified by Test Admin')
+ assert 'you indicated interest in ' in email_tasks[0].kwargs['text']
+
+ def test_permissions(self):
+ # Notification should only be delivered if user has read perms on the
+ # artifact. The perm check happens just before the mail task is
+ # posted.
+ u = M.User.query.get(username='test-admin')
+ self._subscribe(user=u)
+ # Simulate a permission check failure.
+
+ def patched_has_access(*args, **kw):
+ def predicate(*args, **kw):
+ return False
+ return predicate
+ from allura.model.notification import security
+ orig = security.has_access
+ security.has_access = patched_has_access
+ try:
+ # this will create a notification task
+ self._post_notification()
+ ThreadLocalORMSession.flush_all()
+ # running the notification task will create a mail task if the
+ # permission check passes...
+ M.MonQTask.run_ready()
+ ThreadLocalORMSession.flush_all()
+ # ...but in this case it doesn't create a mail task since we
+ # forced the perm check to fail
+ assert M.MonQTask.get() == None
+ finally:
+ security.has_access = orig
+
+ def test_footer(self):
+ footer = MailFooter.monitored(
+ 'test@mail.com',
+ 'http://test1.com',
+ 'http://test2.com')
+ assert 'test@mail.com is subscribed to http://test1.com' in footer
+ assert 'admin can change settings at http://test2.com' in footer
+ footer = MailFooter.standard(M.Notification())
+ self.assertIn('Sent from localhost because you indicated interest',
+ footer)
+
+ def _subscribe(self, **kw):
+ self.pg.subscribe(type='direct', **kw)
+ ThreadLocalORMSession.flush_all()
+ ThreadLocalORMSession.close_all()
+
+ def _post_notification(self):
+ return M.Notification.post(self.pg, 'metadata')
+
+
+class TestSubscriptionTypes(unittest.TestCase):
+
+ def setUp(self):
+ setup_basic_test()
+ self.setup_with_tools()
+
+ @td.with_wiki
+ def setup_with_tools(self):
+ setup_global_objects()
+ g.set_app('wiki')
+ _clear_subscriptions()
+ _clear_notifications()
+ ThreadLocalORMSession.flush_all()
+ ThreadLocalORMSession.close_all()
+ self.pg = WM.Page.query.get(app_config_id=c.app.config._id)
+ M.notification.MAILBOX_QUIESCENT = None # disable message combining
+
+ def test_direct_sub(self):
+ self._subscribe()
+ self._post_notification(text='A')
+ self._post_notification(text='B')
+ ThreadLocalORMSession.flush_all()
+ ThreadLocalORMSession.close_all()
+ M.Mailbox.fire_ready()
+
+ def test_digest_sub(self):
+ self._subscribe(type='digest')
+ self._post_notification(text='x' * 1024)
+ self._post_notification()
+ M.Mailbox.fire_ready()
+
+ def test_summary_sub(self):
+ self._subscribe(type='summary')
+ self._post_notification(text='x' * 1024)
+ self._post_notification()
+ M.Mailbox.fire_ready()
+
+ def test_message(self):
+ self._test_message()
+
+ self.setUp()
+ self._test_message()
+
+ self.setUp()
+ M.notification.MAILBOX_QUIESCENT = timedelta(minutes=1)
+ # will raise "assert msg is not None" since the new message is not 1
+ # min old:
+ self.assertRaises(AssertionError, self._test_message)
+
+ def _test_message(self):
+ self._subscribe()
+ thd = M.Thread.query.get(ref_id=self.pg.index_id())
+ thd.post('This is a very cool message')
+ M.MonQTask.run_ready()
+ ThreadLocalORMSession.flush_all()
+ M.Mailbox.fire_ready()
+ ThreadLocalORMSession.flush_all()
+ ThreadLocalORMSession.close_all()
+ msg = M.MonQTask.query.get(
+ task_name='allura.tasks.mail_tasks.sendmail',
+ state='ready')
+ assert msg is not None
+ assert 'Home@wiki.test.p' in msg.kwargs['reply_to']
+ u = M.User.by_username('test-admin')
+ assert str(u._id) in msg.kwargs['fromaddr'], msg.kwargs['fromaddr']
+
+ def _clear_subscriptions(self):
+ M.Mailbox.query.remove({})
+ ThreadLocalORMSession.flush_all()
+ ThreadLocalORMSession.close_all()
+
+ def _subscribe(self, type='direct', topic=None):
+ self.pg.subscribe(type=type, topic=topic)
+ ThreadLocalORMSession.flush_all()
+ ThreadLocalORMSession.close_all()
+
+ def _post_notification(self, text=None):
+ return M.Notification.post(self.pg, 'metadata', text=text)
+
+ @mock.patch('allura.model.notification.defaultdict')
+ @mock.patch('allura.model.notification.Notification')
+ def test_direct_accumulation(self, mocked_notification, mocked_defaultdict):
+ class OrderedDefaultDict(collections.OrderedDict):
+
+ def __init__(self, factory=list, *a, **kw):
+ self._factory = factory
+ super(OrderedDefaultDict, self).__init__(*a, **kw)
+
+ def __getitem__(self, key):
+ if key not in self:
+ value = self[key] = self._factory()
+ else:
+ value = super(OrderedDefaultDict, self).__getitem__(key)
+ return value
+
+ notifications = mocked_notification.query.find.return_value.all.return_value = [
+ mock.Mock(_id='n0', topic='metadata', subject='s1',
+ from_address='f1', reply_to_address='rt1', author_id='a1'),
+ mock.Mock(_id='n1', topic='metadata', subject='s2',
+ from_address='f2', reply_to_address='rt2', author_id='a2'),
+ mock.Mock(_id='n2', topic='metadata', subject='s2',
+ from_address='f2', reply_to_address='rt2', author_id='a2'),
+ mock.Mock(_id='n3', topic='message', subject='s3',
+ from_address='f3', reply_to_address='rt3', author_id='a3'),
+ mock.Mock(_id='n4', topic='message', subject='s3',
+ from_address='f3', reply_to_address='rt3', author_id='a3'),
+ ]
+ mocked_defaultdict.side_effect = OrderedDefaultDict
+
+ u0 = bson.ObjectId()
+ mbox = M.Mailbox(type='direct', user_id=u0,
+ queue=['n0', 'n1', 'n2', 'n3', 'n4'])
+ mbox.fire('now')
+
+ mocked_notification.query.find.assert_called_once_with(
+ {'_id': {'$in': ['n0', 'n1', 'n2', 'n3', 'n4']}})
+ # first notification should be sent direct, as its key values are
+ # unique
+ notifications[0].send_direct.assert_called_once_with(u0)
+ # next two notifications should be sent as a digest as they have
+ # matching key values
+ mocked_notification.send_digest.assert_called_once_with(
+ u0, 'f2', 's2', [notifications[1], notifications[2]], 'rt2')
+ # final two should be sent direct even though they matching keys, as
+ # they are messages
+ notifications[3].send_direct.assert_called_once_with(u0)
+ notifications[4].send_direct.assert_called_once_with(u0)
+
+ def test_send_direct_disabled_user(self):
+ user = M.User.by_username('test-admin')
+ thd = M.Thread.query.get(ref_id=self.pg.index_id())
+ notification = M.Notification()
+ notification.ref_id = thd.index_id()
+ user.disabled = True
+ ThreadLocalORMSession.flush_all()
+ notification.send_direct(user._id)
+ count = M.MonQTask.query.find(dict(
+ task_name='allura.tasks.mail_tasks.sendmail',
+ state='ready')).count()
+ assert_equal(count, 0)
+ user.disabled = False
+ ThreadLocalORMSession.flush_all()
+ notification.send_direct(user._id)
+ count = M.MonQTask.query.find(dict(
+ task_name='allura.tasks.mail_tasks.sendmail',
+ state='ready')).count()
+ assert_equal(count, 1)
+
+ @mock.patch('allura.model.notification.Notification.ref')
+ def test_send_digest_disabled_user(self, ref):
+ thd = M.Thread.query.get(ref_id=self.pg.index_id())
+ notification = M.Notification()
+ notification.ref_id = thd.index_id()
+ ref.artifact = thd
+ user = M.User.by_username('test-admin')
+ user.disabled = True
+ ThreadLocalORMSession.flush_all()
+ M.Notification.send_digest(
+ user._id, 'test@mail.com', 'subject', [notification])
+ count = M.MonQTask.query.find(dict(
+ task_name='allura.tasks.mail_tasks.sendmail',
+ state='ready')).count()
+ assert_equal(count, 0)
+ user.disabled = False
+ ThreadLocalORMSession.flush_all()
+ M.Notification.send_digest(
+ user._id, 'test@mail.com', 'subject', [notification])
+ count = M.MonQTask.query.find(dict(
+ task_name='allura.tasks.mail_tasks.sendmail',
+ state='ready')).count()
+ assert_equal(count, 1)
+
+
+def _clear_subscriptions():
+ M.Mailbox.query.remove({})
+
+
+def _clear_notifications():
+ M.Notification.query.remove({})
http://git-wip-us.apache.org/repos/asf/allura/blob/d52f8e2a/tests/model/test_project.py
----------------------------------------------------------------------
diff --git a/tests/model/test_project.py b/tests/model/test_project.py
new file mode 100644
index 0000000..f212d9b
--- /dev/null
+++ b/tests/model/test_project.py
@@ -0,0 +1,191 @@
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Model tests for project
+"""
+from __future__ import print_function
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import unicode_literals
+from nose import with_setup
+from nose.tools import assert_equals, assert_in
+from pylons import tmpl_context as c
+from ming.orm.ormsession import ThreadLocalORMSession
+
+from allura import model as M
+from allura.lib import helpers as h
+from allura.tests import decorators as td
+from alluratest.controller import setup_basic_test, setup_global_objects
+from allura.lib.exceptions import ToolError, Invalid
+from mock import MagicMock, patch
+
+
+def setUp():
+ setup_basic_test()
+ setup_with_tools()
+
+
+@td.with_wiki
+def setup_with_tools():
+ setup_global_objects()
+
+
+def test_project():
+ assert_equals(type(c.project.sidebar_menu()), list)
+ assert_in(c.project.script_name, c.project.url())
+ old_proj = c.project
+ h.set_context('test/sub1', neighborhood='Projects')
+ assert_equals(type(c.project.sidebar_menu()), list)
+ assert_equals(type(c.project.sitemap()), list)
+ assert_equals(c.project.sitemap()[1].label, 'Admin')
+ assert_in(old_proj, list(c.project.parent_iter()))
+ h.set_context('test', 'wiki', neighborhood='Projects')
+ adobe_nbhd = M.Neighborhood.query.get(name='Adobe')
+ p = M.Project.query.get(
+ shortname='adobe-1', neighborhood_id=adobe_nbhd._id)
+ # assert 'http' in p.url() # We moved adobe into /adobe/, not
+ # http://adobe....
+ assert_in(p.script_name, p.url())
+ assert_equals(c.project.shortname, 'test')
+ assert_in('<p>', c.project.description_html)
+ c.project.uninstall_app('hello-test-mount-point')
+ ThreadLocalORMSession.flush_all()
+
+ c.project.install_app('Wiki', 'hello-test-mount-point')
+ c.project.support_page = 'hello-test-mount-point'
+ assert_equals(c.project.app_config('wiki').tool_name, 'wiki')
+ ThreadLocalORMSession.flush_all()
+ with td.raises(ToolError):
+ # already installed
+ c.project.install_app('Wiki', 'hello-test-mount-point')
+ ThreadLocalORMSession.flush_all()
+ c.project.uninstall_app('hello-test-mount-point')
+ ThreadLocalORMSession.flush_all()
+ with td.raises(ToolError):
+ # mount point reserved
+ c.project.install_app('Wiki', 'feed')
+ with td.raises(ToolError):
+ # mount point too long
+ c.project.install_app('Wiki', 'a' * 64)
+ with td.raises(ToolError):
+ # mount point must begin with letter
+ c.project.install_app('Wiki', '1')
+ # single letter mount points are allowed
+ c.project.install_app('Wiki', 'a')
+ # Make sure the project support page is reset if the tool it was pointing
+ # to is uninstalled.
+ assert c.project.support_page == ''
+ app_config = c.project.app_config('hello')
+ app_inst = c.project.app_instance(app_config)
+ app_inst = c.project.app_instance('hello')
+ app_inst = c.project.app_instance('hello2123')
+ c.project.breadcrumbs()
+ c.app.config.breadcrumbs()
+
+
+def test_project_index():
+ project, idx = c.project, c.project.index()
+ assert 'id' in idx
+ assert idx['id'] == project.index_id()
+ assert 'title' in idx
+ assert 'type_s' in idx
+ assert 'deleted_b' in idx
+ assert 'private_b' in idx
+ assert 'neighborhood_id_s' in idx
+ assert 'short_description_t' in idx
+ assert 'url_s' in idx
+
+
+def test_subproject():
+ project = M.Project.query.get(shortname='test')
+ with td.raises(ToolError):
+ with patch('allura.lib.plugin.ProjectRegistrationProvider') as Provider:
+ Provider.get().shortname_validator.to_python.side_effect = Invalid(
+ 'name', 'value', {})
+ # name doesn't validate
+ sp = project.new_subproject('test-proj-nose')
+ sp = project.new_subproject('test-proj-nose')
+ spp = sp.new_subproject('spp')
+ ThreadLocalORMSession.flush_all()
+ sp.delete()
+ ThreadLocalORMSession.flush_all()
+
+
+@td.with_wiki
+def test_anchored_tools():
+ c.project.neighborhood.anchored_tools = 'wiki:Wiki, tickets:Ticket'
+ c.project.install_app = MagicMock()
+ assert_equals(c.project.sitemap()[0].label, 'Wiki')
+ assert_equals(c.project.install_app.call_args[0][0], 'tickets')
+ assert_equals(c.project.ordered_mounts()[0]['ac'].tool_name, 'wiki')
+
+
+def test_set_ordinal_to_admin_tool():
+ with h.push_config(c,
+ user=M.User.by_username('test-admin'),
+ project=M.Project.query.get(shortname='test')):
+ sm = c.project.sitemap()
+ assert_equals(sm[-1].tool_name, 'admin')
+
+
+@with_setup(setUp)
+def test_users_and_roles():
+ p = M.Project.query.get(shortname='test')
+ sub = p.direct_subprojects[0]
+ u = M.User.by_username('test-admin')
+ assert p.users_with_role('Admin') == [u]
+ assert p.users_with_role('Admin') == sub.users_with_role('Admin')
+ assert p.users_with_role('Admin') == p.admins()
+
+ user = p.admins()[0]
+ user.disabled = True
+ ThreadLocalORMSession.flush_all()
+ assert p.users_with_role('Admin') == []
+ assert p.users_with_role('Admin') == p.admins()
+
+
+@with_setup(setUp)
+def test_project_disabled_users():
+ p = M.Project.query.get(shortname='test')
+ users = p.users()
+ assert users[0].username == 'test-admin'
+ user = M.User.by_username('test-admin')
+ user.disabled = True
+ ThreadLocalORMSession.flush_all()
+ users = p.users()
+ assert users == []
+
+def test_screenshot_unicode_serialization():
+ p = M.Project.query.get(shortname='test')
+ screenshot_unicode = M.ProjectFile(project_id=p._id, category='screenshot', caption="ConSelección", filename='ConSelección.jpg')
+ screenshot_ascii = M.ProjectFile(project_id=p._id, category='screenshot', caption='test-screenshot', filename='test_file.jpg')
+ ThreadLocalORMSession.flush_all()
+
+ serialized = p.__json__()
+ screenshots = sorted(serialized['screenshots'], key=lambda k: k['caption'])
+
+ assert len(screenshots) == 2
+ assert screenshots[0]['url'] == 'http://localhost/p/test/screenshot/ConSelecci%C3%B3n.jpg'
+ assert screenshots[0]['caption'] == "ConSelección"
+ assert screenshots[0]['thumbnail_url'] == 'http://localhost/p/test/screenshot/ConSelecci%C3%B3n.jpg/thumb'
+
+ assert screenshots[1]['url'] == 'http://localhost/p/test/screenshot/test_file.jpg'
+ assert screenshots[1]['caption'] == 'test-screenshot'
+ assert screenshots[1]['thumbnail_url'] == 'http://localhost/p/test/screenshot/test_file.jpg/thumb'