You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@allura.apache.org by he...@apache.org on 2015/05/29 22:40:32 UTC

[10/45] allura git commit: [#7878] Used 2to3 to see what issues would come up

http://git-wip-us.apache.org/repos/asf/allura/blob/d52f8e2a/tests/model/test_auth.py
----------------------------------------------------------------------
diff --git a/tests/model/test_auth.py b/tests/model/test_auth.py
new file mode 100644
index 0000000..959ef10
--- /dev/null
+++ b/tests/model/test_auth.py
@@ -0,0 +1,420 @@
+# -*- coding: utf-8 -*-
+
+#       Licensed to the Apache Software Foundation (ASF) under one
+#       or more contributor license agreements.  See the NOTICE file
+#       distributed with this work for additional information
+#       regarding copyright ownership.  The ASF licenses this file
+#       to you under the Apache License, Version 2.0 (the
+#       "License"); you may not use this file except in compliance
+#       with the License.  You may obtain a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#       Unless required by applicable law or agreed to in writing,
+#       software distributed under the License is distributed on an
+#       "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#       KIND, either express or implied.  See the License for the
+#       specific language governing permissions and limitations
+#       under the License.
+
+"""
+Model tests for auth
+"""
+from __future__ import print_function
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import unicode_literals
+
+from nose.tools import (
+    with_setup,
+    assert_equal,
+    assert_not_equal,
+    assert_true,
+    assert_not_in,
+    assert_in,
+)
+from pylons import tmpl_context as c, app_globals as g
+from webob import Request
+from mock import patch, Mock
+from datetime import datetime, timedelta
+
+from ming.orm.ormsession import ThreadLocalORMSession
+from ming.odm import session
+
+from allura import model as M
+from allura.lib import plugin
+from allura.tests import decorators as td
+from alluratest.controller import setup_basic_test, setup_global_objects, setup_functional_test
+
+
+def setUp():
+    setup_basic_test()
+    ThreadLocalORMSession.close_all()
+    setup_global_objects()
+
+
+@with_setup(setUp)
+def test_email_address():
+    addr = M.EmailAddress(email='test_admin@domain.net',
+                          claimed_by_user_id=c.user._id)
+    ThreadLocalORMSession.flush_all()
+    assert addr.claimed_by_user() == c.user
+    addr2 = M.EmailAddress.create('test@domain.net')
+    addr3 = M.EmailAddress.create('test_admin@domain.net')
+
+    # Duplicate emails are allowed, until the email is confirmed
+    assert addr3 is not addr
+
+    assert addr2 is not addr
+    assert addr2
+    addr4 = M.EmailAddress.create('test@DOMAIN.NET')
+    assert addr4 is not addr2
+
+    assert addr is c.user.address_object('test_admin@domain.net')
+    c.user.claim_address('test@DOMAIN.NET')
+    assert 'test@domain.net' in c.user.email_addresses
+
+
+@with_setup(setUp)
+def test_email_address_lookup_helpers():
+    addr = M.EmailAddress.create('TEST@DOMAIN.NET')
+    nobody = M.EmailAddress.create('nobody@example.com')
+    ThreadLocalORMSession.flush_all()
+    assert_equal(addr.email, 'TEST@domain.net')
+
+    assert_equal(M.EmailAddress.get(email='TEST@DOMAIN.NET'), addr)
+    assert_equal(M.EmailAddress.get(email='TEST@domain.net'), addr)
+    assert_equal(M.EmailAddress.get(email='test@domain.net'), None)
+    assert_equal(M.EmailAddress.get(email=None), None)
+    assert_equal(M.EmailAddress.get(email='nobody@example.com'), nobody)
+    # invalid email returns None, but not nobody@example.com as before
+    assert_equal(M.EmailAddress.get(email='invalid'), None)
+
+    assert_equal(M.EmailAddress.find(dict(email='TEST@DOMAIN.NET')).all(), [addr])
+    assert_equal(M.EmailAddress.find(dict(email='TEST@domain.net')).all(), [addr])
+    assert_equal(M.EmailAddress.find(dict(email='test@domain.net')).all(), [])
+    assert_equal(M.EmailAddress.find(dict(email=None)).all(), [])
+    assert_equal(M.EmailAddress.find(dict(email='nobody@example.com')).all(), [nobody])
+    # invalid email returns empty query, but not nobody@example.com as before
+    assert_equal(M.EmailAddress.find(dict(email='invalid')).all(), [])
+
+
+@with_setup(setUp)
+def test_email_address_canonical():
+    assert_equal(M.EmailAddress.canonical('nobody@EXAMPLE.COM'),
+                 'nobody@example.com')
+    assert_equal(M.EmailAddress.canonical('nobody@example.com'),
+                 'nobody@example.com')
+    assert_equal(M.EmailAddress.canonical('I Am Nobody <no...@example.com>'),
+                 'nobody@example.com')
+    assert_equal(M.EmailAddress.canonical('  nobody@example.com\t'),
+                 'nobody@example.com')
+    assert_equal(M.EmailAddress.canonical('I Am@Nobody <no...@example.com> '),
+                 'nobody@example.com')
+    assert_equal(M.EmailAddress.canonical(' No@body <no...@example.com> '),
+                 'no@body@example.com')
+    assert_equal(M.EmailAddress.canonical('no@body@example.com'),
+                 'no@body@example.com')
+    assert_equal(M.EmailAddress.canonical('invalid'), None)
+
+@with_setup(setUp)
+def test_email_address_send_verification_link():
+    addr = M.EmailAddress(email='test_admin@domain.net',
+                          claimed_by_user_id=c.user._id)
+
+    addr.send_verification_link()
+
+    with patch('allura.tasks.mail_tasks.smtp_client._client') as _client:
+        M.MonQTask.run_ready()
+    return_path, rcpts, body = _client.sendmail.call_args[0]
+    assert_equal(rcpts, ['test_admin@domain.net'])
+
+
+@td.with_user_project('test-admin')
+@with_setup(setUp)
+def test_user():
+    assert c.user.url() .endswith('/u/test-admin/')
+    assert c.user.script_name .endswith('/u/test-admin/')
+    assert_equal(set(p.shortname for p in c.user.my_projects()),
+                 set(['test', 'test2', 'u/test-admin', 'adobe-1', '--init--']))
+    # delete one of the projects and make sure it won't appear in my_projects()
+    p = M.Project.query.get(shortname='test2')
+    p.deleted = True
+    ThreadLocalORMSession.flush_all()
+    assert_equal(set(p.shortname for p in c.user.my_projects()),
+                 set(['test', 'u/test-admin', 'adobe-1', '--init--']))
+    u = M.User.register(dict(
+        username='nosetest-user'))
+    ThreadLocalORMSession.flush_all()
+    assert_equal(u.private_project().shortname, 'u/nosetest-user')
+    roles = g.credentials.user_roles(
+        u._id, project_id=u.private_project().root_project._id)
+    assert len(roles) == 3, roles
+    u.set_password('foo')
+    provider = plugin.LocalAuthenticationProvider(Request.blank('/'))
+    assert provider._validate_password(u, 'foo')
+    assert not provider._validate_password(u, 'foobar')
+    u.set_password('foobar')
+    assert provider._validate_password(u, 'foobar')
+    assert not provider._validate_password(u, 'foo')
+
+
+@with_setup(setUp)
+def test_user_project_creates_on_demand():
+    u = M.User.register(dict(username='foobar123'), make_project=False)
+    ThreadLocalORMSession.flush_all()
+    assert not M.Project.query.get(shortname='u/foobar123')
+    assert u.private_project()
+    assert M.Project.query.get(shortname='u/foobar123')
+
+
+@with_setup(setUp)
+def test_user_project_already_deleted_creates_on_demand():
+    u = M.User.register(dict(username='foobar123'), make_project=True)
+    p = M.Project.query.get(shortname='u/foobar123')
+    p.deleted = True
+    ThreadLocalORMSession.flush_all()
+    assert not M.Project.query.get(shortname='u/foobar123', deleted=False)
+    assert u.private_project()
+    ThreadLocalORMSession.flush_all()
+    assert M.Project.query.get(shortname='u/foobar123', deleted=False)
+
+
+@with_setup(setUp)
+def test_user_project_does_not_create_on_demand_for_disabled_user():
+    u = M.User.register(
+        dict(username='foobar123', disabled=True), make_project=False)
+    ThreadLocalORMSession.flush_all()
+    assert not u.private_project()
+    assert not M.Project.query.get(shortname='u/foobar123')
+
+
+@with_setup(setUp)
+def test_user_project_does_not_create_on_demand_for_anonymous_user():
+    u = M.User.anonymous()
+    ThreadLocalORMSession.flush_all()
+    assert not u.private_project()
+    assert not M.Project.query.get(shortname='u/anonymous')
+    assert not M.Project.query.get(shortname='u/*anonymous')
+
+
+@with_setup(setUp)
+@patch('allura.model.auth.log')
+def test_user_by_email_address(log):
+    u1 = M.User.register(dict(username='abc1'), make_project=False)
+    u2 = M.User.register(dict(username='abc2'), make_project=False)
+    addr1 = M.EmailAddress(email='abc123@abc.me', confirmed=True,
+                           claimed_by_user_id=u1._id)
+    addr2 = M.EmailAddress(email='abc123@abc.me', confirmed=True,
+                           claimed_by_user_id=u2._id)
+    # both users are disabled
+    u1.disabled, u2.disabled = True, True
+    ThreadLocalORMSession.flush_all()
+    assert_equal(M.User.by_email_address('abc123@abc.me'), None)
+    assert_equal(log.warn.call_count, 0)
+
+    # only u2 is active
+    u1.disabled, u2.disabled = True, False
+    ThreadLocalORMSession.flush_all()
+    assert_equal(M.User.by_email_address('abc123@abc.me'), u2)
+    assert_equal(log.warn.call_count, 0)
+
+    # both are active
+    u1.disabled, u2.disabled = False, False
+    ThreadLocalORMSession.flush_all()
+    assert_in(M.User.by_email_address('abc123@abc.me'), [u1, u2])
+    assert_equal(log.warn.call_count, 1)
+
+    # invalid email returns None, but not user which claimed
+    # nobody@example.com as before
+    nobody = M.EmailAddress(email='nobody@example.com', confirmed=True,
+                            claimed_by_user_id=u1._id)
+    ThreadLocalORMSession.flush_all()
+    assert_equal(M.User.by_email_address('nobody@example.com'), u1)
+    assert_equal(M.User.by_email_address('invalid'), None)
+
+
+@with_setup(setUp)
+def test_project_role():
+    role = M.ProjectRole(project_id=c.project._id, name='test_role')
+    M.ProjectRole.by_user(c.user, upsert=True).roles.append(role._id)
+    ThreadLocalORMSession.flush_all()
+    roles = g.credentials.user_roles(
+        c.user._id, project_id=c.project.root_project._id)
+    roles_ids = [r['_id'] for r in roles]
+    roles = M.ProjectRole.query.find({'_id': {'$in': roles_ids}})
+    for pr in roles:
+        assert pr.display()
+        pr.special
+        assert pr.user in (c.user, None, M.User.anonymous())
+
+
+@with_setup(setUp)
+def test_default_project_roles():
+    roles = dict(
+        (pr.name, pr)
+        for pr in M.ProjectRole.query.find(dict(
+            project_id=c.project._id)).all()
+        if pr.name)
+    assert 'Admin' in list(roles.keys()), list(roles.keys())
+    assert 'Developer' in list(roles.keys()), list(roles.keys())
+    assert 'Member' in list(roles.keys()), list(roles.keys())
+    assert roles['Developer']._id in roles['Admin'].roles
+    assert roles['Member']._id in roles['Developer'].roles
+
+    # There're 1 user assigned to project, represented by
+    # relational (vs named) ProjectRole's
+    assert len(roles) == M.ProjectRole.query.find(dict(
+        project_id=c.project._id)).count() - 1
+
+
+@with_setup(setUp)
+def test_email_address_claimed_by_user():
+    addr = M.EmailAddress(email='test_admin@domain.net',
+                          claimed_by_user_id=c.user._id)
+    c.user.disabled = True
+    ThreadLocalORMSession.flush_all()
+    assert addr.claimed_by_user() is None
+
+
+@td.with_user_project('test-admin')
+@with_setup(setUp)
+def test_user_projects_by_role():
+    assert_equal(set(p.shortname for p in c.user.my_projects()),
+                 set(['test', 'test2', 'u/test-admin', 'adobe-1', '--init--']))
+    assert_equal(set(p.shortname for p in c.user.my_projects_by_role_name('Admin')),
+                 set(['test', 'test2', 'u/test-admin', 'adobe-1', '--init--']))
+    # Remove admin access from c.user to test2 project
+    project = M.Project.query.get(shortname='test2')
+    admin_role = M.ProjectRole.by_name('Admin', project)
+    developer_role = M.ProjectRole.by_name('Developer', project)
+    user_role = M.ProjectRole.by_user(c.user, project=project, upsert=True)
+    user_role.roles.remove(admin_role._id)
+    user_role.roles.append(developer_role._id)
+    ThreadLocalORMSession.flush_all()
+    g.credentials.clear()
+    assert_equal(set(p.shortname for p in c.user.my_projects()),
+                 set(['test', 'test2', 'u/test-admin', 'adobe-1', '--init--']))
+    assert_equal(set(p.shortname for p in c.user.my_projects_by_role_name('Admin')),
+                 set(['test', 'u/test-admin', 'adobe-1', '--init--']))
+
+
+@td.with_user_project('test-admin')
+@with_setup(setUp)
+def test_user_projects_unnamed():
+    """
+    Confirm that spurious ProjectRoles associating a user with
+    a project to which they do not belong to any named group
+    don't cause the user to count as a member of the project.
+    """
+    sub1 = M.Project.query.get(shortname='test/sub1')
+    M.ProjectRole(
+        user_id=c.user._id,
+        project_id=sub1._id)
+    ThreadLocalORMSession.flush_all()
+    project_names = [p.shortname for p in c.user.my_projects()]
+    assert_not_in('test/sub1', project_names)
+    assert_in('test', project_names)
+
+
+@patch.object(g, 'user_message_max_messages', 3)
+def test_check_sent_user_message_times():
+    user1 = M.User.register(dict(username='test-user'), make_project=False)
+    time1 = datetime.utcnow() - timedelta(minutes=30)
+    time2 = datetime.utcnow() - timedelta(minutes=45)
+    time3 = datetime.utcnow() - timedelta(minutes=70)
+    user1.sent_user_message_times = [time1, time2, time3]
+    assert user1.can_send_user_message()
+    assert_equal(len(user1.sent_user_message_times), 2)
+    user1.sent_user_message_times.append(
+        datetime.utcnow() - timedelta(minutes=15))
+    assert not user1.can_send_user_message()
+
+
+@td.with_user_project('test-admin')
+@with_setup(setUp)
+def test_user_track_active():
+    # without this session flushing inside track_active raises Exception
+    setup_functional_test()
+    c.user = M.User.by_username('test-admin')
+
+    assert_equal(c.user.last_access['session_date'], None)
+    assert_equal(c.user.last_access['session_ip'], None)
+    assert_equal(c.user.last_access['session_ua'], None)
+
+    req = Mock(headers={'User-Agent': 'browser'}, remote_addr='addr')
+    c.user.track_active(req)
+    c.user = M.User.by_username(c.user.username)
+    assert_not_equal(c.user.last_access['session_date'], None)
+    assert_equal(c.user.last_access['session_ip'], 'addr')
+    assert_equal(c.user.last_access['session_ua'], 'browser')
+
+    # ensure that session activity tracked with a whole-day granularity
+    prev_date = c.user.last_access['session_date']
+    c.user.track_active(req)
+    c.user = M.User.by_username(c.user.username)
+    assert_equal(c.user.last_access['session_date'], prev_date)
+    yesterday = datetime.utcnow() - timedelta(1)
+    c.user.last_access['session_date'] = yesterday
+    session(c.user).flush(c.user)
+    c.user.track_active(req)
+    c.user = M.User.by_username(c.user.username)
+    assert_true(c.user.last_access['session_date'] > yesterday)
+
+    # ...or if IP or User Agent has changed
+    req.remote_addr = 'new addr'
+    c.user.track_active(req)
+    c.user = M.User.by_username(c.user.username)
+    assert_equal(c.user.last_access['session_ip'], 'new addr')
+    assert_equal(c.user.last_access['session_ua'], 'browser')
+    req.headers['User-Agent'] = 'new browser'
+    c.user.track_active(req)
+    c.user = M.User.by_username(c.user.username)
+    assert_equal(c.user.last_access['session_ip'], 'new addr')
+    assert_equal(c.user.last_access['session_ua'], 'new browser')
+
+
+@with_setup(setUp)
+def test_user_index():
+    c.user.email_addresses = ['email1', 'email2']
+    c.user.set_pref('email_address', 'email2')
+    idx = c.user.index()
+    assert_equal(idx['id'], c.user.index_id())
+    assert_equal(idx['title'], 'User test-admin')
+    assert_equal(idx['type_s'], 'User')
+    assert_equal(idx['username_s'], 'test-admin')
+    assert_equal(idx['email_addresses_t'], 'email1 email2')
+    assert_equal(idx['email_address_s'], 'email2')
+    assert_in('last_password_updated_dt', idx)
+    assert_equal(idx['disabled_b'], False)
+    assert_in('results_per_page_i', idx)
+    assert_in('email_format_s', idx)
+    assert_in('disable_user_messages_b', idx)
+    assert_equal(idx['display_name_t'], 'Test Admin')
+    assert_equal(idx['sex_s'], 'Unknown')
+    assert_in('birthdate_dt', idx)
+    assert_in('localization_s', idx)
+    assert_in('timezone_s', idx)
+    assert_in('socialnetworks_t', idx)
+    assert_in('telnumbers_t', idx)
+    assert_in('skypeaccount_s', idx)
+    assert_in('webpages_t', idx)
+    assert_in('skills_t', idx)
+    assert_in('last_access_login_date_dt', idx)
+    assert_in('last_access_login_ip_s', idx)
+    assert_in('last_access_login_ua_t', idx)
+    assert_in('last_access_session_date_dt', idx)
+    assert_in('last_access_session_ip_s', idx)
+    assert_in('last_access_session_ua_t', idx)
+    # provided bby auth provider
+    assert_in('user_registration_date_dt', idx)
+
+@with_setup(setUp)
+def test_user_index_none_values():
+    c.user.email_addresses = [None]
+    c.user.set_pref('telnumbers', [None])
+    c.user.set_pref('webpages', [None])
+    idx = c.user.index()
+    assert_equal(idx['email_addresses_t'], '')
+    assert_equal(idx['telnumbers_t'], '')
+    assert_equal(idx['webpages_t'], '')

http://git-wip-us.apache.org/repos/asf/allura/blob/d52f8e2a/tests/model/test_discussion.py
----------------------------------------------------------------------
diff --git a/tests/model/test_discussion.py b/tests/model/test_discussion.py
new file mode 100644
index 0000000..ab9c610
--- /dev/null
+++ b/tests/model/test_discussion.py
@@ -0,0 +1,519 @@
+# -*- coding: utf-8 -*-
+
+#       Licensed to the Apache Software Foundation (ASF) under one
+#       or more contributor license agreements.  See the NOTICE file
+#       distributed with this work for additional information
+#       regarding copyright ownership.  The ASF licenses this file
+#       to you under the Apache License, Version 2.0 (the
+#       "License"); you may not use this file except in compliance
+#       with the License.  You may obtain a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#       Unless required by applicable law or agreed to in writing,
+#       software distributed under the License is distributed on an
+#       "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#       KIND, either express or implied.  See the License for the
+#       specific language governing permissions and limitations
+#       under the License.
+
+"""
+Model tests for artifact
+"""
+from __future__ import print_function
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import unicode_literals
+from io import StringIO
+import time
+from datetime import datetime, timedelta
+from cgi import FieldStorage
+
+from pylons import tmpl_context as c
+from nose.tools import assert_equals, with_setup
+import mock
+from mock import patch
+from nose.tools import assert_equal
+
+from ming.orm import session, ThreadLocalORMSession
+from webob import exc
+
+from allura import model as M
+from allura.lib import helpers as h
+from allura.tests import TestController
+from alluratest.controller import setup_global_objects
+
+
+def setUp():
+    controller = TestController()
+    controller.setUp()
+    controller.app.get('/wiki/Home/')
+    setup_global_objects()
+    ThreadLocalORMSession.close_all()
+    h.set_context('test', 'wiki', neighborhood='Projects')
+    ThreadLocalORMSession.flush_all()
+    ThreadLocalORMSession.close_all()
+
+
+def tearDown():
+    ThreadLocalORMSession.close_all()
+
+
+@with_setup(setUp, tearDown)
+def test_discussion_methods():
+    d = M.Discussion(shortname='test', name='test')
+    assert d.thread_class() == M.Thread
+    assert d.post_class() == M.Post
+    assert d.attachment_class() == M.DiscussionAttachment
+    ThreadLocalORMSession.flush_all()
+    d.update_stats()
+    ThreadLocalORMSession.flush_all()
+    assert d.last_post == None
+    assert d.url().endswith('wiki/_discuss/')
+    assert d.index()['name_s'] == 'test'
+    assert d.subscription() == None
+    assert d.find_posts().count() == 0
+    jsn = d.__json__()
+    assert jsn['name'] == d.name
+    d.delete()
+    ThreadLocalORMSession.flush_all()
+    ThreadLocalORMSession.close_all()
+
+
+@with_setup(setUp, tearDown)
+def test_thread_methods():
+    d = M.Discussion(shortname='test', name='test')
+    t = M.Thread.new(discussion_id=d._id, subject='Test Thread')
+    assert t.discussion_class() == M.Discussion
+    assert t.post_class() == M.Post
+    assert t.attachment_class() == M.DiscussionAttachment
+    p0 = t.post('This is a post')
+    p1 = t.post('This is another post')
+    time.sleep(0.25)
+    t.post('This is a reply', parent_id=p0._id)
+    ThreadLocalORMSession.flush_all()
+    ThreadLocalORMSession.close_all()
+    d = M.Discussion.query.get(shortname='test')
+    t = d.threads[0]
+    assert d.last_post is not None
+    assert t.last_post is not None
+    t.create_post_threads(t.posts)
+    posts0 = t.find_posts(page=0, limit=10, style='threaded')
+    posts1 = t.find_posts(page=0, limit=10, style='timestamp')
+    assert posts0 != posts1
+    ts = p0.timestamp.replace(
+        microsecond=int(p0.timestamp.microsecond // 1000) * 1000)
+    posts2 = t.find_posts(page=0, limit=10, style='threaded', timestamp=ts)
+    assert len(posts2) > 0
+
+    assert 'wiki/_discuss/' in t.url()
+    assert t.index()['views_i'] == 0
+    assert not t.subscription
+    t.subscription = True
+    assert t.subscription
+    t.subscription = False
+    assert not t.subscription
+    assert t.post_count == 3
+    jsn = t.__json__()
+    assert '_id' in jsn
+    assert_equals(len(jsn['posts']), 3)
+    (p.approve() for p in (p0, p1))
+    assert t.num_replies == 2
+    t.spam()
+    assert t.num_replies == 0
+    ThreadLocalORMSession.flush_all()
+    assert len(t.find_posts()) == 0
+    t.delete()
+
+
+@with_setup(setUp, tearDown)
+def test_thread_new():
+    with mock.patch('allura.model.discuss.h.nonce') as nonce:
+        nonce.side_effect = ['deadbeef', 'deadbeef', 'beefdead']
+        d = M.Discussion(shortname='test', name='test')
+        t1 = M.Thread.new(discussion_id=d._id, subject='Test Thread One')
+        t2 = M.Thread.new(discussion_id=d._id, subject='Test Thread Two')
+        ThreadLocalORMSession.flush_all()
+        session(t1).expunge(t1)
+        session(t2).expunge(t2)
+        t1_2 = M.Thread.query.get(_id=t1._id)
+        t2_2 = M.Thread.query.get(_id=t2._id)
+        assert_equals(t1._id, 'deadbeef')
+        assert_equals(t2._id, 'beefdead')
+        assert_equals(t1_2.subject, 'Test Thread One')
+        assert_equals(t2_2.subject, 'Test Thread Two')
+
+
+@with_setup(setUp, tearDown)
+def test_post_methods():
+    d = M.Discussion(shortname='test', name='test')
+    t = M.Thread.new(discussion_id=d._id, subject='Test Thread')
+    p = t.post('This is a post')
+    p2 = t.post('This is another post')
+    assert p.discussion_class() == M.Discussion
+    assert p.thread_class() == M.Thread
+    assert p.attachment_class() == M.DiscussionAttachment
+    p.commit()
+    assert p.parent is None
+    assert p.subject == 'Test Thread'
+    assert_equals(p.attachments, [])
+    assert 'wiki/_discuss' in p.url()
+    assert p.reply_subject() == 'Re: Test Thread'
+    assert p.link_text() == p.subject
+
+    ss = p.history().first()
+    assert 'version' in h.get_first(ss.index(), 'title')
+    assert '#' in ss.shorthand_id()
+
+    jsn = p.__json__()
+    assert jsn["thread_id"] == t._id
+
+    (p.approve() for p in (p, p2))
+    assert t.num_replies == 1
+    p2.spam()
+    assert t.num_replies == 0
+    p.spam()
+    assert t.num_replies == 0
+    p.delete()
+    assert t.num_replies == 0
+
+
+@with_setup(setUp, tearDown)
+def test_attachment_methods():
+    d = M.Discussion(shortname='test', name='test')
+    t = M.Thread.new(discussion_id=d._id, subject='Test Thread')
+    p = t.post('This is a post')
+    p_att = p.attach('foo.text', StringIO('Hello, world!'),
+                     discussion_id=d._id,
+                     thread_id=t._id,
+                     post_id=p._id)
+    t_att = p.attach('foo2.text', StringIO('Hello, thread!'),
+                     discussion_id=d._id,
+                     thread_id=t._id)
+    d_att = p.attach('foo3.text', StringIO('Hello, discussion!'),
+                     discussion_id=d._id)
+
+    ThreadLocalORMSession.flush_all()
+    assert p_att.post == p
+    assert p_att.thread == t
+    assert p_att.discussion == d
+    for att in (p_att, t_att, d_att):
+        assert 'wiki/_discuss' in att.url()
+        assert 'attachment/' in att.url()
+
+    # Test notification in mail
+    t = M.Thread.new(discussion_id=d._id, subject='Test comment notification')
+    fs = FieldStorage()
+    fs.name = 'file_info'
+    fs.filename = 'fake.txt'
+    fs.type = 'text/plain'
+    fs.file = StringIO('this is the content of the fake file\n')
+    p = t.post(text='test message', forum=None, subject='', file_info=fs)
+    ThreadLocalORMSession.flush_all()
+    n = M.Notification.query.get(
+        subject='[test:wiki] Test comment notification')
+    assert '\nAttachment: fake.txt (37 Bytes; text/plain)' in n.text
+
+
+@with_setup(setUp, tearDown())
+def test_multiple_attachments():
+    test_file1 = FieldStorage()
+    test_file1.name = 'file_info'
+    test_file1.filename = 'test1.txt'
+    test_file1.type = 'text/plain'
+    test_file1.file = StringIO('test file1\n')
+    test_file2 = FieldStorage()
+    test_file2.name = 'file_info'
+    test_file2.filename = 'test2.txt'
+    test_file2.type = 'text/plain'
+    test_file2.file = StringIO('test file2\n')
+    d = M.Discussion(shortname='test', name='test')
+    t = M.Thread.new(discussion_id=d._id, subject='Test Thread')
+    test_post = t.post('test post')
+    test_post.add_multiple_attachments([test_file1, test_file2])
+    ThreadLocalORMSession.flush_all()
+    assert_equals(len(test_post.attachments), 2)
+    attaches = test_post.attachments
+    assert 'test1.txt' in [attaches[0].filename, attaches[1].filename]
+    assert 'test2.txt' in [attaches[0].filename, attaches[1].filename]
+
+
+@with_setup(setUp, tearDown)
+def test_add_attachment():
+    test_file = FieldStorage()
+    test_file.name = 'file_info'
+    test_file.filename = 'test.txt'
+    test_file.type = 'text/plain'
+    test_file.file = StringIO('test file\n')
+    d = M.Discussion(shortname='test', name='test')
+    t = M.Thread.new(discussion_id=d._id, subject='Test Thread')
+    test_post = t.post('test post')
+    test_post.add_attachment(test_file)
+    ThreadLocalORMSession.flush_all()
+    assert_equals(len(test_post.attachments), 1)
+    attach = test_post.attachments[0]
+    assert attach.filename == 'test.txt', attach.filename
+    assert attach.content_type == 'text/plain', attach.content_type
+
+
+def test_notification_two_attaches():
+    d = M.Discussion(shortname='test', name='test')
+    t = M.Thread.new(discussion_id=d._id, subject='Test comment notification')
+    fs1 = FieldStorage()
+    fs1.name = 'file_info'
+    fs1.filename = 'fake.txt'
+    fs1.type = 'text/plain'
+    fs1.file = StringIO('this is the content of the fake file\n')
+    fs2 = FieldStorage()
+    fs2.name = 'file_info'
+    fs2.filename = 'fake2.txt'
+    fs2.type = 'text/plain'
+    fs2.file = StringIO('this is the content of the fake file\n')
+    t.post(text='test message', forum=None, subject='', file_info=[fs1, fs2])
+    ThreadLocalORMSession.flush_all()
+    n = M.Notification.query.get(
+        subject='[test:wiki] Test comment notification')
+    assert '\nAttachment: fake.txt (37 Bytes; text/plain)  fake2.txt (37 Bytes; text/plain)' in n.text
+
+
+@with_setup(setUp, tearDown)
+def test_discussion_delete():
+    d = M.Discussion(shortname='test', name='test')
+    t = M.Thread.new(discussion_id=d._id, subject='Test Thread')
+    p = t.post('This is a post')
+    p.attach('foo.text', StringIO(''),
+             discussion_id=d._id,
+             thread_id=t._id,
+             post_id=p._id)
+    M.ArtifactReference.from_artifact(d)
+    rid = d.index_id()
+    ThreadLocalORMSession.flush_all()
+    d.delete()
+    ThreadLocalORMSession.flush_all()
+    assert_equals(M.ArtifactReference.query.find(dict(_id=rid)).count(), 0)
+
+
+@with_setup(setUp, tearDown)
+def test_thread_delete():
+    d = M.Discussion(shortname='test', name='test')
+    t = M.Thread.new(discussion_id=d._id, subject='Test Thread')
+    p = t.post('This is a post')
+    p.attach('foo.text', StringIO(''),
+             discussion_id=d._id,
+             thread_id=t._id,
+             post_id=p._id)
+    ThreadLocalORMSession.flush_all()
+    t.delete()
+
+
+@with_setup(setUp, tearDown)
+def test_post_delete():
+    d = M.Discussion(shortname='test', name='test')
+    t = M.Thread.new(discussion_id=d._id, subject='Test Thread')
+    p = t.post('This is a post')
+    p.attach('foo.text', StringIO(''),
+             discussion_id=d._id,
+             thread_id=t._id,
+             post_id=p._id)
+    ThreadLocalORMSession.flush_all()
+    p.delete()
+
+
+@with_setup(setUp, tearDown)
+def test_post_permission_check():
+    d = M.Discussion(shortname='test', name='test')
+    t = M.Thread.new(discussion_id=d._id, subject='Test Thread')
+    c.user = M.User.anonymous()
+    try:
+        t.post('This post will fail the check.')
+        assert False, "Expected an anonymous post to fail."
+    except exc.HTTPUnauthorized:
+        pass
+    t.post('This post will pass the check.', ignore_security=True)
+
+
+@with_setup(setUp, tearDown)
+def test_post_url_paginated():
+    d = M.Discussion(shortname='test', name='test')
+    t = M.Thread(discussion_id=d._id, subject='Test Thread')
+    p = []  # posts in display order
+    ts = datetime.utcnow() - timedelta(days=1)
+    for i in range(5):
+        ts += timedelta(minutes=1)
+        p.append(t.post('This is a post #%s' % i, timestamp=ts))
+
+    ts += timedelta(minutes=1)
+    p.insert(1, t.post(
+        'This is reply #0 to post #0', parent_id=p[0]._id, timestamp=ts))
+
+    ts += timedelta(minutes=1)
+    p.insert(2, t.post(
+        'This is reply #1 to post #0', parent_id=p[0]._id, timestamp=ts))
+
+    ts += timedelta(minutes=1)
+    p.insert(4, t.post(
+        'This is reply #0 to post #1', parent_id=p[3]._id, timestamp=ts))
+
+    ts += timedelta(minutes=1)
+    p.insert(6, t.post(
+        'This is reply #0 to post #2', parent_id=p[5]._id, timestamp=ts))
+
+    ts += timedelta(minutes=1)
+    p.insert(7, t.post(
+        'This is reply #1 to post #2', parent_id=p[5]._id, timestamp=ts))
+
+    ts += timedelta(minutes=1)
+    p.insert(8, t.post(
+        'This is reply #0 to reply #1 to post #2',
+        parent_id=p[7]._id, timestamp=ts))
+
+    # with default paging limit
+    for _p in p:
+        url = t.url() + '?limit=25#' + _p.slug
+        assert _p.url_paginated() == url, _p.url_paginated()
+
+    # with user paging limit
+    limit = 3
+    c.user.set_pref('results_per_page', limit)
+    for i, _p in enumerate(p):
+        page = i / limit
+        url = t.url() + '?limit=%s' % limit
+        if page > 0:
+            url += '&page=%s' % page
+        url += '#' + _p.slug
+        assert _p.url_paginated() == url, _p.url_paginated()
+
+
+@with_setup(setUp, tearDown)
+def test_post_url_paginated_with_artifact():
+    """Post.url_paginated should return link to attached artifact, if any"""
+    from forgewiki.model import Page
+    page = Page.upsert(title='Test Page')
+    thread = page.discussion_thread
+    comment = thread.post('Comment')
+    url = page.url() + '?limit=25#' + comment.slug
+    assert_equals(comment.url_paginated(), url)
+
+
+@with_setup(setUp, tearDown)
+def test_post_notify():
+    d = M.Discussion(shortname='test', name='test')
+    d.monitoring_email = 'darthvader@deathstar.org'
+    t = M.Thread.new(discussion_id=d._id, subject='Test Thread')
+    with patch('allura.model.notification.Notification.send_simple') as send:
+        t.post('This is a post')
+        send.assert_called_with(d.monitoring_email)
+
+    c.app.config.project.notifications_disabled = True
+    with patch('allura.model.notification.Notification.send_simple') as send:
+        t.post('Another post')
+        try:
+            send.assert_called_with(d.monitoring_email)
+        except AssertionError:
+            pass  # method not called as expected
+        else:
+            assert False, 'send_simple must not be called'
+
+
+@with_setup(setUp, tearDown)
+@patch('allura.model.discuss.c.project.users_with_role')
+def test_is_spam_for_admin(users):
+    users.return_value = [c.user, ]
+    d = M.Discussion(shortname='test', name='test')
+    t = M.Thread(discussion_id=d._id, subject='Test Thread')
+    t.post('This is a post')
+    post = M.Post.query.get(text='This is a post')
+    assert not t.is_spam(post), t.is_spam(post)
+
+
+@with_setup(setUp, tearDown)
+@patch('allura.model.discuss.c.project.users_with_role')
+def test_is_spam(role):
+    d = M.Discussion(shortname='test', name='test')
+    t = M.Thread(discussion_id=d._id, subject='Test Thread')
+    role.return_value = []
+    with mock.patch('allura.controllers.discuss.g.spam_checker') as spam_checker:
+        spam_checker.check.return_value = True
+        post = mock.Mock()
+        assert t.is_spam(post), t.is_spam(post)
+        assert spam_checker.check.call_count == 1, spam_checker.call_count
+
+
+@with_setup(setUp, tearDown)
+@mock.patch('allura.controllers.discuss.g.spam_checker')
+def test_not_spam_and_has_unmoderated_post_permission(spam_checker):
+    spam_checker.check.return_value = False
+    d = M.Discussion(shortname='test', name='test')
+    t = M.Thread(discussion_id=d._id, subject='Test Thread')
+    role = M.ProjectRole.by_name('*anonymous')._id
+    post_permission = M.ACE.allow(role, 'post')
+    unmoderated_post_permission = M.ACE.allow(role, 'unmoderated_post')
+    t.acl.append(post_permission)
+    t.acl.append(unmoderated_post_permission)
+    with h.push_config(c, user=M.User.anonymous()):
+        post = t.post('Hey')
+    assert_equal(post.status, 'ok')
+
+
+@with_setup(setUp, tearDown)
+@mock.patch('allura.controllers.discuss.g.spam_checker')
+@mock.patch.object(M.Thread, 'notify_moderators')
+def test_not_spam_but_has_no_unmoderated_post_permission(spam_checker, notify_moderators):
+    spam_checker.check.return_value = False
+    d = M.Discussion(shortname='test', name='test')
+    t = M.Thread(discussion_id=d._id, subject='Test Thread')
+    role = M.ProjectRole.by_name('*anonymous')._id
+    post_permission = M.ACE.allow(role, 'post')
+    t.acl.append(post_permission)
+    with h.push_config(c, user=M.User.anonymous()):
+        post = t.post('Hey')
+    assert_equal(post.status, 'pending')
+    notify_moderators.assert_called_once()
+
+
+@with_setup(setUp, tearDown)
+@mock.patch('allura.controllers.discuss.g.spam_checker')
+@mock.patch.object(M.Thread, 'notify_moderators')
+def test_spam_and_has_unmoderated_post_permission(spam_checker, notify_moderators):
+    spam_checker.check.return_value = True
+    d = M.Discussion(shortname='test', name='test')
+    t = M.Thread(discussion_id=d._id, subject='Test Thread')
+    role = M.ProjectRole.by_name('*anonymous')._id
+    post_permission = M.ACE.allow(role, 'post')
+    unmoderated_post_permission = M.ACE.allow(role, 'unmoderated_post')
+    t.acl.append(post_permission)
+    t.acl.append(unmoderated_post_permission)
+    with h.push_config(c, user=M.User.anonymous()):
+        post = t.post('Hey')
+    assert_equal(post.status, 'pending')
+    notify_moderators.assert_called_once()
+
+
+@with_setup(setUp, tearDown)
+@mock.patch('allura.controllers.discuss.g.spam_checker')
+def test_thread_subject_not_included_in_text_checked(spam_checker):
+    spam_checker.check.return_value = False
+    d = M.Discussion(shortname='test', name='test')
+    t = M.Thread(discussion_id=d._id, subject='Test Thread')
+    t.post('Hello')
+    spam_checker.check.assert_called_once()
+    assert_equal(spam_checker.check.call_args[0][0], 'Hello')
+
+
+def test_post_count():
+    d = M.Discussion(shortname='test', name='test')
+    t = M.Thread(discussion_id=d._id, subject='Test Thread')
+    M.Post(discussion_id=d._id, thread_id=t._id, status='spam')
+    M.Post(discussion_id=d._id, thread_id=t._id, status='ok')
+    M.Post(discussion_id=d._id, thread_id=t._id, status='pending')
+    ThreadLocalORMSession.flush_all()
+    assert_equal(t.post_count, 2)
+
+
+@mock.patch('allura.controllers.discuss.g.spam_checker')
+def test_spam_num_replies(spam_checker):
+    d = M.Discussion(shortname='test', name='test')
+    t = M.Thread(discussion_id=d._id, subject='Test Thread', num_replies=2)
+    p1 = M.Post(discussion_id=d._id, thread_id=t._id, status='spam')
+    p1.spam()
+    assert_equal(t.num_replies, 1)

http://git-wip-us.apache.org/repos/asf/allura/blob/d52f8e2a/tests/model/test_filesystem.py
----------------------------------------------------------------------
diff --git a/tests/model/test_filesystem.py b/tests/model/test_filesystem.py
new file mode 100644
index 0000000..f9e2c9e
--- /dev/null
+++ b/tests/model/test_filesystem.py
@@ -0,0 +1,220 @@
+# -*- coding: utf-8 -*-
+
+#       Licensed to the Apache Software Foundation (ASF) under one
+#       or more contributor license agreements.  See the NOTICE file
+#       distributed with this work for additional information
+#       regarding copyright ownership.  The ASF licenses this file
+#       to you under the Apache License, Version 2.0 (the
+#       "License"); you may not use this file except in compliance
+#       with the License.  You may obtain a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#       Unless required by applicable law or agreed to in writing,
+#       software distributed under the License is distributed on an
+#       "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#       KIND, either express or implied.  See the License for the
+#       specific language governing permissions and limitations
+#       under the License.
+
+from __future__ import print_function
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import unicode_literals
+import os
+from unittest import TestCase
+from io import StringIO
+from io import BytesIO
+
+from pylons import tmpl_context as c
+from ming.orm import session, Mapper
+from nose.tools import assert_equal
+from mock import patch
+from webob import Request, Response
+
+from allura import model as M
+from alluratest.controller import setup_unit_test
+
+
+class File(M.File):
+
+    class __mongometa__:
+        session = M.session.main_orm_session
+Mapper.compile_all()
+
+
+class TestFile(TestCase):
+
+    def setUp(self):
+        setup_unit_test()
+        self.session = session(File)
+        self.conn = M.session.main_doc_session.db._connection
+        self.db = M.session.main_doc_session.db
+
+        self.db.fs.remove()
+        self.db.fs.files.remove()
+        self.db.fs.chunks.remove()
+
+    def test_from_stream(self):
+        f = File.from_stream('test1.txt', StringIO('test1'))
+        self.session.flush()
+        assert self.db.fs.count() == 1
+        assert self.db.fs.files.count() == 1
+        assert self.db.fs.chunks.count() == 1
+        assert f.filename == 'test1.txt'
+        assert f.content_type == 'text/plain'
+        self._assert_content(f, 'test1')
+
+    def test_from_data(self):
+        f = File.from_data('test2.txt', 'test2')
+        self.session.flush(f)
+        assert self.db.fs.count() == 1
+        assert self.db.fs.files.count() == 1
+        assert self.db.fs.chunks.count() == 1
+        assert f.filename == 'test2.txt'
+        assert f.content_type == 'text/plain'
+        self._assert_content(f, 'test2')
+
+    def test_from_path(self):
+        path = __file__.rstrip('c')
+        f = File.from_path(path)
+        self.session.flush()
+        assert self.db.fs.count() == 1
+        assert self.db.fs.files.count() == 1
+        assert self.db.fs.chunks.count() >= 1
+        assert f.filename == os.path.basename(path)
+        text = f.rfile().read()
+        assert text.startswith('# -*-')
+
+    def test_delete(self):
+        f = File.from_data('test1.txt', 'test1')
+        self.session.flush()
+        assert self.db.fs.count() == 1
+        assert self.db.fs.files.count() == 1
+        assert self.db.fs.chunks.count() == 1
+        f.delete()
+        self.session.flush()
+        assert self.db.fs.count() == 0
+        assert self.db.fs.files.count() == 0
+        assert self.db.fs.chunks.count() == 0
+
+    def test_remove(self):
+        File.from_data('test1.txt', 'test1')
+        File.from_data('test2.txt', 'test2')
+        self.session.flush()
+        assert self.db.fs.count() == 2
+        assert self.db.fs.files.count() == 2
+        assert self.db.fs.chunks.count() == 2
+        File.remove(dict(filename='test1.txt'))
+        self.session.flush()
+        assert self.db.fs.count() == 1
+        assert self.db.fs.files.count() == 1
+        assert self.db.fs.chunks.count() == 1
+
+    def test_overwrite(self):
+        f = File.from_data('test1.txt', 'test1')
+        self.session.flush()
+        assert self.db.fs.count() == 1
+        assert self.db.fs.files.count() == 1
+        assert self.db.fs.chunks.count() == 1
+        self._assert_content(f, 'test1')
+        with f.wfile() as fp:
+            fp.write('test2')
+        self.session.flush()
+        assert self.db.fs.count() == 1
+        assert self.db.fs.files.count() == 2
+        assert self.db.fs.chunks.count() == 2
+        self._assert_content(f, 'test2')
+
+    def test_serve_embed(self):
+        f = File.from_data('te s\u0b6e1.txt', 'test1')
+        self.session.flush()
+        with patch('allura.lib.utils.tg.request', Request.blank('/')), \
+                patch('allura.lib.utils.pylons.response', Response()) as response, \
+                patch('allura.lib.utils.etag_cache') as etag_cache:
+            response_body = list(f.serve())
+            etag_cache.assert_called_once_with('{}?{}'.format(f.filename,
+                                                               f._id.generation_time).encode('utf-8'))
+            assert_equal(['test1'], response_body)
+            assert_equal(response.content_type, f.content_type)
+            assert 'Content-Disposition' not in response.headers
+
+    def test_serve_embed_false(self):
+        f = File.from_data('te s\u0b6e1.txt', 'test1')
+        self.session.flush()
+        with patch('allura.lib.utils.tg.request', Request.blank('/')), \
+                patch('allura.lib.utils.pylons.response', Response()) as response, \
+                patch('allura.lib.utils.etag_cache') as etag_cache:
+            response_body = list(f.serve(embed=False))
+            etag_cache.assert_called_once_with('{}?{}'.format(f.filename,
+                                                               f._id.generation_time).encode('utf-8'))
+            assert_equal(['test1'], response_body)
+            assert_equal(response.content_type, f.content_type)
+            assert_equal(response.headers['Content-Disposition'],
+                         'attachment;filename="te s\xe0\xad\xae1.txt"')
+
+    def test_image(self):
+        path = os.path.join(
+            os.path.dirname(__file__), '..', 'data', 'user.png')
+        with open(path) as fp:
+            f, t = File.save_image(
+                'user.png',
+                fp,
+                thumbnail_size=(16, 16),
+                square=True,
+                save_original=True)
+        self.session.flush()
+        assert f.content_type == 'image/png'
+        assert f.is_image()
+        assert t.content_type == 'image/png'
+        assert t.is_image()
+        assert f.filename == t.filename
+        assert self.db.fs.count() == 2
+        assert self.db.fs.files.count() == 2
+        assert self.db.fs.chunks.count() == 2
+
+    def test_not_image(self):
+        f, t = File.save_image(
+            'file.txt',
+            StringIO('blah'),
+            thumbnail_size=(16, 16),
+            square=True,
+            save_original=True)
+        assert f == None
+        assert t == None
+
+    def test_invalid_image(self):
+        f, t = File.save_image(
+            'bogus.png',
+            StringIO('bogus data here!'),
+            thumbnail_size=(16, 16),
+            square=True,
+            save_original=True)
+        assert f == None
+        assert t == None
+
+    def test_partial_image_as_attachment(self):
+        path = os.path.join(os.path.dirname(__file__),
+                            '..', 'data', 'user.png')
+        fp = BytesIO(open(path, 'rb').read(500))
+        c.app.config._id = None
+        attachment = M.BaseAttachment.save_attachment('user.png', fp,
+                                                      save_original=True)
+        assert type(attachment) != tuple   # tuple is for (img, thumb) pairs
+        assert_equal(attachment.length, 500)
+        assert_equal(attachment.filename, 'user.png')
+
+    def test_attachment_name_encoding(self):
+        path = os.path.join(os.path.dirname(__file__),
+                            '..', 'data', 'user.png')
+        fp = open(path, 'rb')
+        c.app.config._id = None
+        attachment = M.BaseAttachment.save_attachment(
+            b'Strukturpr\xfcfung.dvi', fp,
+            save_original=True)
+        assert type(attachment) != tuple   # tuple is for (img, thumb) pairs
+        assert_equal(attachment.filename, 'Strukturpr\xfcfung.dvi')
+
+    def _assert_content(self, f, content):
+        result = f.rfile().read()
+        assert result == content, result

http://git-wip-us.apache.org/repos/asf/allura/blob/d52f8e2a/tests/model/test_monq.py
----------------------------------------------------------------------
diff --git a/tests/model/test_monq.py b/tests/model/test_monq.py
new file mode 100644
index 0000000..055bb91
--- /dev/null
+++ b/tests/model/test_monq.py
@@ -0,0 +1,46 @@
+from __future__ import print_function
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import unicode_literals
+#       Licensed to the Apache Software Foundation (ASF) under one
+#       or more contributor license agreements.  See the NOTICE file
+#       distributed with this work for additional information
+#       regarding copyright ownership.  The ASF licenses this file
+#       to you under the Apache License, Version 2.0 (the
+#       "License"); you may not use this file except in compliance
+#       with the License.  You may obtain a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#       Unless required by applicable law or agreed to in writing,
+#       software distributed under the License is distributed on an
+#       "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#       KIND, either express or implied.  See the License for the
+#       specific language governing permissions and limitations
+#       under the License.
+
+import pprint
+from nose.tools import with_setup
+
+from ming.orm import ThreadLocalORMSession
+
+from alluratest.controller import setup_basic_test, setup_global_objects
+from allura import model as M
+
+
+def setUp():
+    setup_basic_test()
+    ThreadLocalORMSession.close_all()
+    setup_global_objects()
+    M.MonQTask.query.remove({})
+
+
+@with_setup(setUp)
+def test_basic_task():
+    task = M.MonQTask.post(pprint.pformat, ([5, 6],))
+    ThreadLocalORMSession.flush_all()
+    ThreadLocalORMSession.close_all()
+    task = M.MonQTask.get()
+    assert task
+    task()
+    assert task.result == 'I[5, 6]', task.result

http://git-wip-us.apache.org/repos/asf/allura/blob/d52f8e2a/tests/model/test_neighborhood.py
----------------------------------------------------------------------
diff --git a/tests/model/test_neighborhood.py b/tests/model/test_neighborhood.py
new file mode 100644
index 0000000..0177c26
--- /dev/null
+++ b/tests/model/test_neighborhood.py
@@ -0,0 +1,97 @@
+# -*- coding: utf-8 -*-
+
+#       Licensed to the Apache Software Foundation (ASF) under one
+#       or more contributor license agreements.  See the NOTICE file
+#       distributed with this work for additional information
+#       regarding copyright ownership.  The ASF licenses this file
+#       to you under the Apache License, Version 2.0 (the
+#       "License"); you may not use this file except in compliance
+#       with the License.  You may obtain a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#       Unless required by applicable law or agreed to in writing,
+#       software distributed under the License is distributed on an
+#       "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#       KIND, either express or implied.  See the License for the
+#       specific language governing permissions and limitations
+#       under the License.
+
+"""
+Model tests for neighborhood
+"""
+from __future__ import print_function
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import unicode_literals
+from nose.tools import with_setup
+
+from allura import model as M
+from allura.tests import decorators as td
+from alluratest.controller import setup_basic_test, setup_global_objects
+
+
+def setUp():
+    setup_basic_test()
+    setup_with_tools()
+
+
+@td.with_wiki
+def setup_with_tools():
+    setup_global_objects()
+
+
+@with_setup(setUp)
+def test_neighborhood():
+    neighborhood = M.Neighborhood.query.get(name='Projects')
+    # Check css output depends of neighborhood level
+    test_css = ".text{color:#000;}"
+    neighborhood.css = test_css
+    neighborhood.features['css'] = 'none'
+    assert neighborhood.get_custom_css() == ""
+    neighborhood.features['css'] = 'picker'
+    assert neighborhood.get_custom_css() == test_css
+    neighborhood.features['css'] = 'custom'
+    assert neighborhood.get_custom_css() == test_css
+    # Check max projects
+    neighborhood.features['max_projects'] = None
+    assert neighborhood.get_max_projects() is None
+    neighborhood.features['max_projects'] = 500
+    assert neighborhood.get_max_projects() == 500
+
+    # Check picker css styles
+    test_css_dict = {'barontop': '#444',
+                     'titlebarbackground': '#555',
+                     'projecttitlefont': 'arial,sans-serif',
+                     'projecttitlecolor': '#333',
+                     'titlebarcolor': '#666',
+                     'addopt-icon-theme': 'dark'}
+    css_text = neighborhood.compile_css_for_picker(test_css_dict)
+    assert '#333' in css_text
+    assert '#444' in css_text
+    assert '#555' in css_text
+    assert '#666' in css_text
+    assert 'arial,sans-serif' in css_text
+    assert 'images/neo-icon-set-ffffff-256x350.png' in css_text
+    neighborhood.css = css_text
+    styles_list = neighborhood.get_css_for_picker()
+    for style in styles_list:
+        assert test_css_dict[style['name']] == style['value']
+        if style['name'] == 'titlebarcolor':
+            assert '<option value="dark" selected="selected">' in style[
+                'additional']
+
+    # Check neighborhood custom css showing
+    neighborhood.features['css'] = 'none'
+    assert not neighborhood.allow_custom_css
+    neighborhood.features['css'] = 'picker'
+    assert neighborhood.allow_custom_css
+    neighborhood.features['css'] = 'custom'
+    assert neighborhood.allow_custom_css
+
+    neighborhood.anchored_tools = 'wiki:Wiki, tickets:Tickets'
+    assert neighborhood.get_anchored_tools()['wiki'] == 'Wiki'
+    assert neighborhood.get_anchored_tools()['tickets'] == 'Tickets'
+
+    neighborhood.prohibited_tools = 'wiki, tickets'
+    assert neighborhood.get_prohibited_tools() == ['wiki', 'tickets']

http://git-wip-us.apache.org/repos/asf/allura/blob/d52f8e2a/tests/model/test_notification.py
----------------------------------------------------------------------
diff --git a/tests/model/test_notification.py b/tests/model/test_notification.py
new file mode 100644
index 0000000..e5c1aca
--- /dev/null
+++ b/tests/model/test_notification.py
@@ -0,0 +1,487 @@
+from __future__ import print_function
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import unicode_literals
+#       Licensed to the Apache Software Foundation (ASF) under one
+#       or more contributor license agreements.  See the NOTICE file
+#       distributed with this work for additional information
+#       regarding copyright ownership.  The ASF licenses this file
+#       to you under the Apache License, Version 2.0 (the
+#       "License"); you may not use this file except in compliance
+#       with the License.  You may obtain a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#       Unless required by applicable law or agreed to in writing,
+#       software distributed under the License is distributed on an
+#       "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#       KIND, either express or implied.  See the License for the
+#       specific language governing permissions and limitations
+#       under the License.
+
+import unittest
+from datetime import timedelta
+import collections
+
+from pylons import tmpl_context as c, app_globals as g
+from nose.tools import assert_equal, assert_in
+from ming.orm import ThreadLocalORMSession
+import mock
+import bson
+
+from alluratest.controller import setup_basic_test, setup_global_objects
+from allura import model as M
+from allura.model.notification import MailFooter
+from allura.lib import helpers as h
+from allura.tests import decorators as td
+from forgewiki import model as WM
+
+
+class TestNotification(unittest.TestCase):
+
+    def setUp(self):
+        setup_basic_test()
+        self.setup_with_tools()
+
+    @td.with_wiki
+    def setup_with_tools(self):
+        setup_global_objects()
+        _clear_subscriptions()
+        _clear_notifications()
+        ThreadLocalORMSession.flush_all()
+        ThreadLocalORMSession.close_all()
+        M.notification.MAILBOX_QUIESCENT = None  # disable message combining
+
+    def test_subscribe_unsubscribe(self):
+        M.Mailbox.subscribe(type='direct')
+        ThreadLocalORMSession.flush_all()
+        ThreadLocalORMSession.close_all()
+        subscriptions = M.Mailbox.query.find(dict(
+            project_id=c.project._id,
+            app_config_id=c.app.config._id,
+            user_id=c.user._id)).all()
+        assert len(subscriptions) == 1
+        assert subscriptions[0].type == 'direct'
+        assert M.Mailbox.query.find().count() == 1
+        M.Mailbox.unsubscribe()
+        ThreadLocalORMSession.flush_all()
+        ThreadLocalORMSession.close_all()
+        subscriptions = M.Mailbox.query.find(dict(
+            project_id=c.project._id,
+            app_config_id=c.app.config._id,
+            user_id=c.user._id)).all()
+        assert len(subscriptions) == 0
+        assert M.Mailbox.query.find().count() == 0
+
+    @mock.patch('allura.tasks.mail_tasks.sendmail')
+    def test_send_direct(self, sendmail):
+        c.user = M.User.query.get(username='test-user')
+        wiki = c.project.app_instance('wiki')
+        page = WM.Page.query.get(app_config_id=wiki.config._id)
+        notification = M.Notification(
+            _id='_id',
+            ref=page.ref,
+            from_address='from_address',
+            reply_to_address='reply_to_address',
+            in_reply_to='in_reply_to',
+            references=['a'],
+            subject='subject',
+            text='text',
+        )
+        notification.footer = lambda: ' footer'
+        notification.send_direct(c.user._id)
+        sendmail.post.assert_called_once_with(
+            destinations=[str(c.user._id)],
+            fromaddr='from_address',
+            reply_to='reply_to_address',
+            subject='subject',
+            message_id='_id',
+            in_reply_to='in_reply_to',
+            references=['a'],
+            sender='wiki@test.p.in.localhost',
+            text='text footer',
+        )
+
+    @mock.patch('allura.tasks.mail_tasks.sendmail')
+    def test_send_direct_no_access(self, sendmail):
+        c.user = M.User.query.get(username='test-user')
+        wiki = c.project.app_instance('wiki')
+        page = WM.Page.query.get(app_config_id=wiki.config._id)
+        page.parent_security_context().acl = []
+        ThreadLocalORMSession.flush_all()
+        ThreadLocalORMSession.close_all()
+        notification = M.Notification(
+            _id='_id',
+            ref=page.ref,
+            from_address='from_address',
+            reply_to_address='reply_to_address',
+            in_reply_to='in_reply_to',
+            subject='subject',
+            text='text',
+        )
+        notification.footer = lambda: ' footer'
+        notification.send_direct(c.user._id)
+        assert_equal(sendmail.post.call_count, 0)
+
+    @mock.patch('allura.tasks.mail_tasks.sendmail')
+    def test_send_direct_wrong_project_context(self, sendmail):
+        """
+        Test that Notification.send_direct() works as expected even
+        if c.project is wrong.
+
+        This can happen when a notify task is triggered on project A (thus
+        setting c.project to A) and then calls Mailbox.fire_ready() which fires
+        pending Notifications on any waiting Mailbox, regardless of project,
+        but doesn't update c.project.
+        """
+        project1 = c.project
+        project2 = M.Project.query.get(shortname='test2')
+        assert_equal(project1.shortname, 'test')
+        c.user = M.User.query.get(username='test-user')
+        wiki = project1.app_instance('wiki')
+        page = WM.Page.query.get(app_config_id=wiki.config._id)
+        notification = M.Notification(
+            _id='_id',
+            ref=page.ref,
+            from_address='from_address',
+            reply_to_address='reply_to_address',
+            in_reply_to='in_reply_to',
+            references=['a'],
+            subject='subject',
+            text='text',
+        )
+        notification.footer = lambda: ' footer'
+        c.project = project2
+        notification.send_direct(c.user._id)
+        sendmail.post.assert_called_once_with(
+            destinations=[str(c.user._id)],
+            fromaddr='from_address',
+            reply_to='reply_to_address',
+            subject='subject',
+            message_id='_id',
+            in_reply_to='in_reply_to',
+            references=['a'],
+            sender='wiki@test.p.in.localhost',
+            text='text footer',
+        )
+
+
+class TestPostNotifications(unittest.TestCase):
+
+    def setUp(self):
+        setup_basic_test()
+        self.setup_with_tools()
+
+    @td.with_wiki
+    def setup_with_tools(self):
+        setup_global_objects()
+        g.set_app('wiki')
+        _clear_subscriptions()
+        _clear_notifications()
+        ThreadLocalORMSession.flush_all()
+        ThreadLocalORMSession.close_all()
+        self.pg = WM.Page.query.get(app_config_id=c.app.config._id)
+        M.notification.MAILBOX_QUIESCENT = None  # disable message combining
+        while M.MonQTask.run_ready('setup'):
+            ThreadLocalORMSession.flush_all()
+
+    def test_post_notification(self):
+        self._post_notification()
+        ThreadLocalORMSession.flush_all()
+        M.MonQTask.list()
+        t = M.MonQTask.get()
+        assert t.args[1] == self.pg.index_id()
+
+    def test_post_user_notification(self):
+        u = M.User.query.get(username='test-admin')
+        M.Notification.post_user(u, self.pg, 'metadata')
+        ThreadLocalORMSession.flush_all()
+        ThreadLocalORMSession.close_all()
+        flash_msgs = list(h.pop_user_notifications(u))
+        assert len(flash_msgs) == 1, flash_msgs
+        msg = flash_msgs[0]
+        assert msg['text'].startswith('Home modified by Test Admin')
+        assert msg['subject'].startswith('[test:wiki]')
+        flash_msgs = list(h.pop_user_notifications(u))
+        assert not flash_msgs, flash_msgs
+
+    def test_delivery(self):
+        self._subscribe()
+        self._post_notification()
+        M.MonQTask.run_ready()
+        ThreadLocalORMSession.flush_all()
+        M.MonQTask.run_ready()
+        ThreadLocalORMSession.flush_all()
+        assert M.Mailbox.query.find().count() == 1
+        mbox = M.Mailbox.query.get()
+        assert len(mbox.queue) == 1
+        assert not mbox.queue_empty
+
+    def test_email(self):
+        self._subscribe()  # as current user: test-admin
+        user2 = M.User.query.get(username='test-user-2')
+        self._subscribe(user=user2)
+        self._post_notification()
+        ThreadLocalORMSession.flush_all()
+
+        assert_equal(M.Notification.query.get()
+                     ['from_address'], '"Test Admin" <te...@users.localhost>')
+        assert_equal(M.Mailbox.query.find().count(), 2)
+
+        # sends the notification out into "mailboxes", and from mailboxes into
+        # email tasks
+        M.MonQTask.run_ready()
+        mboxes = M.Mailbox.query.find().all()
+        assert_equal(len(mboxes), 2)
+        assert_equal(len(mboxes[0].queue), 1)
+        assert not mboxes[0].queue_empty
+        assert_equal(len(mboxes[1].queue), 1)
+        assert not mboxes[1].queue_empty
+
+        email_tasks = M.MonQTask.query.find({'state': 'ready'}).all()
+        # make sure both subscribers will get an email
+        assert_equal(len(email_tasks), 2)
+
+        first_destinations = [e.kwargs['destinations'][0] for e in email_tasks]
+        assert_in(str(c.user._id), first_destinations)
+        assert_in(str(user2._id), first_destinations)
+        assert_equal(email_tasks[0].kwargs['fromaddr'],
+                     '"Test Admin" <te...@users.localhost>')
+        assert_equal(email_tasks[1].kwargs['fromaddr'],
+                     '"Test Admin" <te...@users.localhost>')
+        assert_equal(email_tasks[0].kwargs['sender'],
+                     'wiki@test.p.in.localhost')
+        assert_equal(email_tasks[1].kwargs['sender'],
+                     'wiki@test.p.in.localhost')
+        assert email_tasks[0].kwargs['text'].startswith(
+            'Home modified by Test Admin')
+        assert 'you indicated interest in ' in email_tasks[0].kwargs['text']
+
+    def test_permissions(self):
+        # Notification should only be delivered if user has read perms on the
+        # artifact. The perm check happens just before the mail task is
+        # posted.
+        u = M.User.query.get(username='test-admin')
+        self._subscribe(user=u)
+        # Simulate a permission check failure.
+
+        def patched_has_access(*args, **kw):
+            def predicate(*args, **kw):
+                return False
+            return predicate
+        from allura.model.notification import security
+        orig = security.has_access
+        security.has_access = patched_has_access
+        try:
+            # this will create a notification task
+            self._post_notification()
+            ThreadLocalORMSession.flush_all()
+            # running the notification task will create a mail task if the
+            # permission check passes...
+            M.MonQTask.run_ready()
+            ThreadLocalORMSession.flush_all()
+            # ...but in this case it doesn't create a mail task since we
+            # forced the perm check to fail
+            assert M.MonQTask.get() == None
+        finally:
+            security.has_access = orig
+
+    def test_footer(self):
+        footer = MailFooter.monitored(
+            'test@mail.com',
+            'http://test1.com',
+            'http://test2.com')
+        assert 'test@mail.com is subscribed to http://test1.com' in footer
+        assert 'admin can change settings at http://test2.com' in footer
+        footer = MailFooter.standard(M.Notification())
+        self.assertIn('Sent from localhost because you indicated interest',
+                      footer)
+
+    def _subscribe(self, **kw):
+        self.pg.subscribe(type='direct', **kw)
+        ThreadLocalORMSession.flush_all()
+        ThreadLocalORMSession.close_all()
+
+    def _post_notification(self):
+        return M.Notification.post(self.pg, 'metadata')
+
+
+class TestSubscriptionTypes(unittest.TestCase):
+
+    def setUp(self):
+        setup_basic_test()
+        self.setup_with_tools()
+
+    @td.with_wiki
+    def setup_with_tools(self):
+        setup_global_objects()
+        g.set_app('wiki')
+        _clear_subscriptions()
+        _clear_notifications()
+        ThreadLocalORMSession.flush_all()
+        ThreadLocalORMSession.close_all()
+        self.pg = WM.Page.query.get(app_config_id=c.app.config._id)
+        M.notification.MAILBOX_QUIESCENT = None  # disable message combining
+
+    def test_direct_sub(self):
+        self._subscribe()
+        self._post_notification(text='A')
+        self._post_notification(text='B')
+        ThreadLocalORMSession.flush_all()
+        ThreadLocalORMSession.close_all()
+        M.Mailbox.fire_ready()
+
+    def test_digest_sub(self):
+        self._subscribe(type='digest')
+        self._post_notification(text='x' * 1024)
+        self._post_notification()
+        M.Mailbox.fire_ready()
+
+    def test_summary_sub(self):
+        self._subscribe(type='summary')
+        self._post_notification(text='x' * 1024)
+        self._post_notification()
+        M.Mailbox.fire_ready()
+
+    def test_message(self):
+        self._test_message()
+
+        self.setUp()
+        self._test_message()
+
+        self.setUp()
+        M.notification.MAILBOX_QUIESCENT = timedelta(minutes=1)
+        # will raise "assert msg is not None" since the new message is not 1
+        # min old:
+        self.assertRaises(AssertionError, self._test_message)
+
+    def _test_message(self):
+        self._subscribe()
+        thd = M.Thread.query.get(ref_id=self.pg.index_id())
+        thd.post('This is a very cool message')
+        M.MonQTask.run_ready()
+        ThreadLocalORMSession.flush_all()
+        M.Mailbox.fire_ready()
+        ThreadLocalORMSession.flush_all()
+        ThreadLocalORMSession.close_all()
+        msg = M.MonQTask.query.get(
+            task_name='allura.tasks.mail_tasks.sendmail',
+            state='ready')
+        assert msg is not None
+        assert 'Home@wiki.test.p' in msg.kwargs['reply_to']
+        u = M.User.by_username('test-admin')
+        assert str(u._id) in msg.kwargs['fromaddr'], msg.kwargs['fromaddr']
+
+    def _clear_subscriptions(self):
+        M.Mailbox.query.remove({})
+        ThreadLocalORMSession.flush_all()
+        ThreadLocalORMSession.close_all()
+
+    def _subscribe(self, type='direct', topic=None):
+        self.pg.subscribe(type=type, topic=topic)
+        ThreadLocalORMSession.flush_all()
+        ThreadLocalORMSession.close_all()
+
+    def _post_notification(self, text=None):
+        return M.Notification.post(self.pg, 'metadata', text=text)
+
+    @mock.patch('allura.model.notification.defaultdict')
+    @mock.patch('allura.model.notification.Notification')
+    def test_direct_accumulation(self, mocked_notification, mocked_defaultdict):
+        class OrderedDefaultDict(collections.OrderedDict):
+
+            def __init__(self, factory=list, *a, **kw):
+                self._factory = factory
+                super(OrderedDefaultDict, self).__init__(*a, **kw)
+
+            def __getitem__(self, key):
+                if key not in self:
+                    value = self[key] = self._factory()
+                else:
+                    value = super(OrderedDefaultDict, self).__getitem__(key)
+                return value
+
+        notifications = mocked_notification.query.find.return_value.all.return_value = [
+            mock.Mock(_id='n0', topic='metadata', subject='s1',
+                      from_address='f1', reply_to_address='rt1', author_id='a1'),
+            mock.Mock(_id='n1', topic='metadata', subject='s2',
+                      from_address='f2', reply_to_address='rt2', author_id='a2'),
+            mock.Mock(_id='n2', topic='metadata', subject='s2',
+                      from_address='f2', reply_to_address='rt2', author_id='a2'),
+            mock.Mock(_id='n3', topic='message', subject='s3',
+                      from_address='f3', reply_to_address='rt3', author_id='a3'),
+            mock.Mock(_id='n4', topic='message', subject='s3',
+                      from_address='f3', reply_to_address='rt3', author_id='a3'),
+        ]
+        mocked_defaultdict.side_effect = OrderedDefaultDict
+
+        u0 = bson.ObjectId()
+        mbox = M.Mailbox(type='direct', user_id=u0,
+                         queue=['n0', 'n1', 'n2', 'n3', 'n4'])
+        mbox.fire('now')
+
+        mocked_notification.query.find.assert_called_once_with(
+            {'_id': {'$in': ['n0', 'n1', 'n2', 'n3', 'n4']}})
+        # first notification should be sent direct, as its key values are
+        # unique
+        notifications[0].send_direct.assert_called_once_with(u0)
+        # next two notifications should be sent as a digest as they have
+        # matching key values
+        mocked_notification.send_digest.assert_called_once_with(
+            u0, 'f2', 's2', [notifications[1], notifications[2]], 'rt2')
+        # final two should be sent direct even though they matching keys, as
+        # they are messages
+        notifications[3].send_direct.assert_called_once_with(u0)
+        notifications[4].send_direct.assert_called_once_with(u0)
+
+    def test_send_direct_disabled_user(self):
+        user = M.User.by_username('test-admin')
+        thd = M.Thread.query.get(ref_id=self.pg.index_id())
+        notification = M.Notification()
+        notification.ref_id = thd.index_id()
+        user.disabled = True
+        ThreadLocalORMSession.flush_all()
+        notification.send_direct(user._id)
+        count = M.MonQTask.query.find(dict(
+            task_name='allura.tasks.mail_tasks.sendmail',
+            state='ready')).count()
+        assert_equal(count, 0)
+        user.disabled = False
+        ThreadLocalORMSession.flush_all()
+        notification.send_direct(user._id)
+        count = M.MonQTask.query.find(dict(
+            task_name='allura.tasks.mail_tasks.sendmail',
+            state='ready')).count()
+        assert_equal(count, 1)
+
+    @mock.patch('allura.model.notification.Notification.ref')
+    def test_send_digest_disabled_user(self, ref):
+        thd = M.Thread.query.get(ref_id=self.pg.index_id())
+        notification = M.Notification()
+        notification.ref_id = thd.index_id()
+        ref.artifact = thd
+        user = M.User.by_username('test-admin')
+        user.disabled = True
+        ThreadLocalORMSession.flush_all()
+        M.Notification.send_digest(
+            user._id, 'test@mail.com', 'subject', [notification])
+        count = M.MonQTask.query.find(dict(
+            task_name='allura.tasks.mail_tasks.sendmail',
+            state='ready')).count()
+        assert_equal(count, 0)
+        user.disabled = False
+        ThreadLocalORMSession.flush_all()
+        M.Notification.send_digest(
+            user._id, 'test@mail.com', 'subject', [notification])
+        count = M.MonQTask.query.find(dict(
+            task_name='allura.tasks.mail_tasks.sendmail',
+            state='ready')).count()
+        assert_equal(count, 1)
+
+
+def _clear_subscriptions():
+        M.Mailbox.query.remove({})
+
+
+def _clear_notifications():
+        M.Notification.query.remove({})

http://git-wip-us.apache.org/repos/asf/allura/blob/d52f8e2a/tests/model/test_project.py
----------------------------------------------------------------------
diff --git a/tests/model/test_project.py b/tests/model/test_project.py
new file mode 100644
index 0000000..f212d9b
--- /dev/null
+++ b/tests/model/test_project.py
@@ -0,0 +1,191 @@
+# -*- coding: utf-8 -*-
+
+#       Licensed to the Apache Software Foundation (ASF) under one
+#       or more contributor license agreements.  See the NOTICE file
+#       distributed with this work for additional information
+#       regarding copyright ownership.  The ASF licenses this file
+#       to you under the Apache License, Version 2.0 (the
+#       "License"); you may not use this file except in compliance
+#       with the License.  You may obtain a copy of the License at
+#
+#         http://www.apache.org/licenses/LICENSE-2.0
+#
+#       Unless required by applicable law or agreed to in writing,
+#       software distributed under the License is distributed on an
+#       "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+#       KIND, either express or implied.  See the License for the
+#       specific language governing permissions and limitations
+#       under the License.
+
+"""
+Model tests for project
+"""
+from __future__ import print_function
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import unicode_literals
+from nose import with_setup
+from nose.tools import assert_equals, assert_in
+from pylons import tmpl_context as c
+from ming.orm.ormsession import ThreadLocalORMSession
+
+from allura import model as M
+from allura.lib import helpers as h
+from allura.tests import decorators as td
+from alluratest.controller import setup_basic_test, setup_global_objects
+from allura.lib.exceptions import ToolError, Invalid
+from mock import MagicMock, patch
+
+
+def setUp():
+    setup_basic_test()
+    setup_with_tools()
+
+
+@td.with_wiki
+def setup_with_tools():
+    setup_global_objects()
+
+
+def test_project():
+    assert_equals(type(c.project.sidebar_menu()), list)
+    assert_in(c.project.script_name, c.project.url())
+    old_proj = c.project
+    h.set_context('test/sub1', neighborhood='Projects')
+    assert_equals(type(c.project.sidebar_menu()), list)
+    assert_equals(type(c.project.sitemap()), list)
+    assert_equals(c.project.sitemap()[1].label, 'Admin')
+    assert_in(old_proj, list(c.project.parent_iter()))
+    h.set_context('test', 'wiki', neighborhood='Projects')
+    adobe_nbhd = M.Neighborhood.query.get(name='Adobe')
+    p = M.Project.query.get(
+        shortname='adobe-1', neighborhood_id=adobe_nbhd._id)
+    # assert 'http' in p.url() # We moved adobe into /adobe/, not
+    # http://adobe....
+    assert_in(p.script_name, p.url())
+    assert_equals(c.project.shortname, 'test')
+    assert_in('<p>', c.project.description_html)
+    c.project.uninstall_app('hello-test-mount-point')
+    ThreadLocalORMSession.flush_all()
+
+    c.project.install_app('Wiki', 'hello-test-mount-point')
+    c.project.support_page = 'hello-test-mount-point'
+    assert_equals(c.project.app_config('wiki').tool_name, 'wiki')
+    ThreadLocalORMSession.flush_all()
+    with td.raises(ToolError):
+        # already installed
+        c.project.install_app('Wiki', 'hello-test-mount-point')
+    ThreadLocalORMSession.flush_all()
+    c.project.uninstall_app('hello-test-mount-point')
+    ThreadLocalORMSession.flush_all()
+    with td.raises(ToolError):
+        # mount point reserved
+        c.project.install_app('Wiki', 'feed')
+    with td.raises(ToolError):
+        # mount point too long
+        c.project.install_app('Wiki', 'a' * 64)
+    with td.raises(ToolError):
+        # mount point must begin with letter
+        c.project.install_app('Wiki', '1')
+    # single letter mount points are allowed
+    c.project.install_app('Wiki', 'a')
+    # Make sure the project support page is reset if the tool it was pointing
+    # to is uninstalled.
+    assert c.project.support_page == ''
+    app_config = c.project.app_config('hello')
+    app_inst = c.project.app_instance(app_config)
+    app_inst = c.project.app_instance('hello')
+    app_inst = c.project.app_instance('hello2123')
+    c.project.breadcrumbs()
+    c.app.config.breadcrumbs()
+
+
+def test_project_index():
+    project, idx = c.project, c.project.index()
+    assert 'id' in idx
+    assert idx['id'] == project.index_id()
+    assert 'title' in idx
+    assert 'type_s' in idx
+    assert 'deleted_b' in idx
+    assert 'private_b' in idx
+    assert 'neighborhood_id_s' in idx
+    assert 'short_description_t' in idx
+    assert 'url_s' in idx
+
+
+def test_subproject():
+    project = M.Project.query.get(shortname='test')
+    with td.raises(ToolError):
+        with patch('allura.lib.plugin.ProjectRegistrationProvider') as Provider:
+            Provider.get().shortname_validator.to_python.side_effect = Invalid(
+                'name', 'value', {})
+            # name doesn't validate
+            sp = project.new_subproject('test-proj-nose')
+    sp = project.new_subproject('test-proj-nose')
+    spp = sp.new_subproject('spp')
+    ThreadLocalORMSession.flush_all()
+    sp.delete()
+    ThreadLocalORMSession.flush_all()
+
+
+@td.with_wiki
+def test_anchored_tools():
+    c.project.neighborhood.anchored_tools = 'wiki:Wiki, tickets:Ticket'
+    c.project.install_app = MagicMock()
+    assert_equals(c.project.sitemap()[0].label, 'Wiki')
+    assert_equals(c.project.install_app.call_args[0][0], 'tickets')
+    assert_equals(c.project.ordered_mounts()[0]['ac'].tool_name, 'wiki')
+
+
+def test_set_ordinal_to_admin_tool():
+    with h.push_config(c,
+                       user=M.User.by_username('test-admin'),
+                       project=M.Project.query.get(shortname='test')):
+        sm = c.project.sitemap()
+        assert_equals(sm[-1].tool_name, 'admin')
+
+
+@with_setup(setUp)
+def test_users_and_roles():
+    p = M.Project.query.get(shortname='test')
+    sub = p.direct_subprojects[0]
+    u = M.User.by_username('test-admin')
+    assert p.users_with_role('Admin') == [u]
+    assert p.users_with_role('Admin') == sub.users_with_role('Admin')
+    assert p.users_with_role('Admin') == p.admins()
+
+    user = p.admins()[0]
+    user.disabled = True
+    ThreadLocalORMSession.flush_all()
+    assert p.users_with_role('Admin') == []
+    assert p.users_with_role('Admin') == p.admins()
+
+
+@with_setup(setUp)
+def test_project_disabled_users():
+    p = M.Project.query.get(shortname='test')
+    users = p.users()
+    assert users[0].username == 'test-admin'
+    user = M.User.by_username('test-admin')
+    user.disabled = True
+    ThreadLocalORMSession.flush_all()
+    users = p.users()
+    assert users == []
+
+def test_screenshot_unicode_serialization():
+    p = M.Project.query.get(shortname='test')
+    screenshot_unicode = M.ProjectFile(project_id=p._id, category='screenshot', caption="ConSelección", filename='ConSelección.jpg')
+    screenshot_ascii = M.ProjectFile(project_id=p._id, category='screenshot', caption='test-screenshot', filename='test_file.jpg')
+    ThreadLocalORMSession.flush_all()
+
+    serialized = p.__json__()
+    screenshots = sorted(serialized['screenshots'], key=lambda k: k['caption'])
+
+    assert len(screenshots) == 2
+    assert screenshots[0]['url'] == 'http://localhost/p/test/screenshot/ConSelecci%C3%B3n.jpg'
+    assert screenshots[0]['caption'] == "ConSelección"
+    assert screenshots[0]['thumbnail_url'] == 'http://localhost/p/test/screenshot/ConSelecci%C3%B3n.jpg/thumb'
+
+    assert screenshots[1]['url'] == 'http://localhost/p/test/screenshot/test_file.jpg'
+    assert screenshots[1]['caption'] == 'test-screenshot'
+    assert screenshots[1]['thumbnail_url'] == 'http://localhost/p/test/screenshot/test_file.jpg/thumb'