You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@allura.apache.org by he...@apache.org on 2015/05/29 22:40:33 UTC
[11/45] allura git commit: [#7878] Used 2to3 to see what issues would
come up
http://git-wip-us.apache.org/repos/asf/allura/blob/d52f8e2a/tests/functional/test_rest.py
----------------------------------------------------------------------
diff --git a/tests/functional/test_rest.py b/tests/functional/test_rest.py
new file mode 100644
index 0000000..41efaaa
--- /dev/null
+++ b/tests/functional/test_rest.py
@@ -0,0 +1,355 @@
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import print_function
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import unicode_literals
+from pylons import app_globals as g
+import mock
+from nose.tools import assert_equal, assert_in, assert_not_in
+from ming.odm import ThreadLocalODMSession
+
+from allura.tests import decorators as td
+from alluratest.controller import TestRestApiBase
+from allura.lib import helpers as h
+from allura.lib.exceptions import Invalid
+from allura import model as M
+
+
+class TestRestHome(TestRestApiBase):
+
+ def _patch_token(self, OAuthAccessToken):
+ at = OAuthAccessToken.return_value
+ at.__ming__ = mock.MagicMock()
+ at.api_key = 'foo'
+
+ @mock.patch('allura.controllers.rest.M.OAuthAccessToken')
+ @mock.patch('allura.controllers.rest.request')
+ def test_bearer_token_non_bearer(self, request, OAuthAccessToken):
+ request.headers = {}
+ request.params = {'access_token': 'foo'}
+ request.scheme = 'https'
+ self._patch_token(OAuthAccessToken)
+ access_token = OAuthAccessToken.query.get.return_value
+ access_token.is_bearer = False
+ r = self.api_post('/rest/p/test/wiki', access_token='foo')
+ assert_equal(r.status_int, 403)
+ OAuthAccessToken.query.get.assert_called_once_with(api_key='foo')
+
+ @mock.patch('allura.controllers.rest.M.OAuthAccessToken')
+ @mock.patch('allura.controllers.rest.request')
+ def test_bearer_token_invalid(self, request, OAuthAccessToken):
+ request.headers = {}
+ request.params = {'access_token': 'foo'}
+ request.scheme = 'https'
+ self._patch_token(OAuthAccessToken)
+ OAuthAccessToken.query.get.return_value = None
+ r = self.api_post('/rest/p/test/wiki', access_token='foo')
+ assert_equal(r.status_int, 403)
+
+ @mock.patch('allura.controllers.rest.request')
+ @td.with_wiki
+ def test_bearer_token_valid(self, request):
+ user = M.User.by_username('test-admin')
+ consumer_token = M.OAuthConsumerToken(
+ name='foo',
+ description='foo app',
+ )
+ request_token = M.OAuthRequestToken(
+ consumer_token_id=consumer_token._id,
+ user_id=user._id,
+ callback='manual',
+ validation_pin=h.nonce(20),
+ is_bearer=True,
+ )
+ access_token = M.OAuthAccessToken(
+ consumer_token_id=consumer_token._id,
+ request_token_id=request_token._id,
+ user_id=user._id,
+ is_bearer=True,
+ )
+ ThreadLocalODMSession.flush_all()
+ request.headers = {}
+ request.params = {'access_token': access_token.api_key}
+ request.scheme = 'https'
+ r = self.api_post('/rest/p/test/wiki', access_token='foo')
+ assert_equal(r.status_int, 200)
+
+ @mock.patch('allura.controllers.rest.M.OAuthAccessToken')
+ @mock.patch('allura.controllers.rest.request')
+ def test_bearer_token_non_bearer_via_headers(self, request, OAuthAccessToken):
+ request.headers = {
+ 'Authorization': 'Bearer foo'
+ }
+ request.scheme = 'https'
+ self._patch_token(OAuthAccessToken)
+ access_token = OAuthAccessToken.query.get.return_value
+ access_token.is_bearer = False
+ r = self.api_post('/rest/p/test/wiki', access_token='foo')
+ assert_equal(r.status_int, 403)
+ OAuthAccessToken.query.get.assert_called_once_with(api_key='foo')
+
+ @mock.patch('allura.controllers.rest.M.OAuthAccessToken')
+ @mock.patch('allura.controllers.rest.request')
+ def test_bearer_token_invalid_via_headers(self, request, OAuthAccessToken):
+ request.headers = {
+ 'Authorization': 'Bearer foo'
+ }
+ request.scheme = 'https'
+ self._patch_token(OAuthAccessToken)
+ OAuthAccessToken.query.get.return_value = None
+ r = self.api_post('/rest/p/test/wiki', access_token='foo')
+ assert_equal(r.status_int, 403)
+
+ @mock.patch('allura.controllers.rest.request')
+ @td.with_wiki
+ def test_bearer_token_valid_via_headers(self, request):
+ user = M.User.by_username('test-admin')
+ consumer_token = M.OAuthConsumerToken(
+ name='foo',
+ description='foo app',
+ )
+ request_token = M.OAuthRequestToken(
+ consumer_token_id=consumer_token._id,
+ user_id=user._id,
+ callback='manual',
+ validation_pin=h.nonce(20),
+ is_bearer=True,
+ )
+ access_token = M.OAuthAccessToken(
+ consumer_token_id=consumer_token._id,
+ request_token_id=request_token._id,
+ user_id=user._id,
+ is_bearer=True,
+ )
+ ThreadLocalODMSession.flush_all()
+ token = access_token.api_key
+ request.headers = {
+ 'Authorization': 'Bearer {}'.format(token)
+ }
+ request.scheme = 'https'
+ r = self.api_post('/rest/p/test/wiki', access_token='foo')
+ assert_equal(r.status_int, 200)
+
+ def test_bad_path(self):
+ r = self.api_post('/rest/1/test/wiki/')
+ assert r.status_int == 404
+ r = self.api_post('/rest/p/1223/wiki/')
+ assert r.status_int == 404
+ r = self.api_post('/rest/p/test/12wiki/')
+ assert r.status_int == 404
+
+ def test_no_api(self):
+ r = self.api_post('/rest/p/test/admin/')
+ assert r.status_int == 404
+
+ @td.with_wiki
+ def test_project_ping(self):
+ r = self.api_get('/rest/p/test/wiki/Home/')
+ assert r.status_int == 200
+ assert r.json['title'] == 'Home', r.json
+
+ @td.with_tool('test/sub1', 'Wiki', 'wiki')
+ def test_subproject_ping(self):
+ r = self.api_get('/rest/p/test/sub1/wiki/Home/')
+ assert r.status_int == 200
+ assert r.json['title'] == 'Home', r.json
+
+ def test_project_code(self):
+ r = self.api_get('/rest/p/test/')
+ assert r.status_int == 200
+
+ def test_project_data(self):
+ r = self.api_get('/rest/p/test/')
+ assert_equal(r.json['shortname'], 'test')
+ assert_equal(r.json['name'], 'Test Project')
+ assert_equal(len(r.json['developers']), 1)
+ admin_dev = r.json['developers'][0]
+ assert_equal(admin_dev['username'], 'test-admin')
+ assert_equal(admin_dev['name'], 'Test Admin')
+ assert_equal(admin_dev['url'], 'http://localhost/u/test-admin/')
+
+ @td.with_tool('test', 'Tickets', 'bugs')
+ @td.with_tool('test', 'Tickets', 'private-bugs')
+ def test_project_data_tools(self):
+ # Deny anonymous to see 'private-bugs' tool
+ role = M.ProjectRole.by_name('*anonymous')._id
+ read_permission = M.ACE.allow(role, 'read')
+ app = M.Project.query.get(
+ shortname='test').app_instance('private-bugs')
+ if read_permission in app.config.acl:
+ app.config.acl.remove(read_permission)
+
+ # admin sees both 'Tickets' tools
+ r = self.api_get('/rest/p/test/')
+ assert_equal(r.json['shortname'], 'test')
+ tool_mounts = [t['mount_point'] for t in r.json['tools']]
+ assert_in('bugs', tool_mounts)
+ assert_in('private-bugs', tool_mounts)
+
+ # anonymous sees only non-private tool
+ r = self.app.get('/rest/p/test/',
+ extra_environ={'username': '*anonymous'})
+ assert_equal(r.json['shortname'], 'test')
+ tool_mounts = [t['mount_point'] for t in r.json['tools']]
+ assert_in('bugs', tool_mounts)
+ assert_not_in('private-bugs', tool_mounts)
+
+ def test_unicode(self):
+ self.app.post(
+ '/wiki/tést/update',
+ params={
+ 'title': 'tést',
+ 'text': 'sometext',
+ 'labels': '',
+ 'viewable_by-0.id': 'all'})
+ r = self.api_get('/rest/p/test/wiki/tést/')
+ assert r.status_int == 200
+ assert r.json['title'].encode('utf-8') == 'tést', r.json
+
+ @td.with_wiki
+ def test_deny_access(self):
+ wiki = M.Project.query.get(shortname='test').app_instance('wiki')
+ anon_read_perm = M.ACE.allow(
+ M.ProjectRole.by_name('*anonymous')._id, 'read')
+ auth_read_perm = M.ACE.allow(
+ M.ProjectRole.by_name('*authenticated')._id, 'read')
+ acl = wiki.config.acl
+ if anon_read_perm in acl:
+ acl.remove(anon_read_perm)
+ if auth_read_perm in acl:
+ acl.remove(auth_read_perm)
+ self.app.get('/rest/p/test/wiki/Home/',
+ extra_environ={'username': '*anonymous'},
+ status=401)
+ self.app.get('/rest/p/test/wiki/Home/',
+ extra_environ={'username': 'test-user-0'},
+ status=403)
+
+ def test_index(self):
+ eps = {
+ 'site_stats': {
+ 'foo_24hr': lambda: 42,
+ 'bar_24hr': lambda: 84,
+ 'qux_24hr': lambda: 0,
+ },
+ }
+ with mock.patch.dict(g.entry_points, eps):
+ response = self.app.get('/rest/')
+ assert_equal(response.json, {
+ 'site_stats': {
+ 'foo_24hr': 42,
+ 'bar_24hr': 84,
+ 'qux_24hr': 0,
+ },
+ })
+
+ def test_name_validation(self):
+ r = self.api_get('/rest/p/test/')
+ assert r.status_int == 200
+ with mock.patch('allura.lib.plugin.ProjectRegistrationProvider') as Provider:
+ Provider.get().shortname_validator.to_python.side_effect = Invalid(
+ 'name', 'value', {})
+ r = self.api_get('/rest/p/test/')
+ assert r.status_int == 404
+
+class TestDoap(TestRestApiBase):
+ validate_skip = True
+ ns = '{http://usefulinc.com/ns/doap#}'
+ ns_sf = '{http://sourceforge.net/api/sfelements.rdf#}'
+ foaf = '{http://xmlns.com/foaf/0.1/}'
+ dc = '{http://dublincore.org/documents/dcmi-namespace/}'
+ rdf = '{http://www.w3.org/1999/02/22-rdf-syntax-ns#}'
+
+ def test_project_data(self):
+ project = M.Project.query.get(shortname='test')
+ project.summary = 'A Summary'
+ project.short_description = 'A Short Description'
+ ThreadLocalODMSession.flush_all()
+ r = self.app.get('/rest/p/test?doap')
+ assert_equal(r.content_type, 'application/rdf+xml')
+ p = r.xml.find(self.ns + 'Project')
+ assert_equal(p.attrib[self.rdf + 'about'], 'http://localhost/rest/p/test?doap#')
+ assert_equal(p.find(self.ns + 'name').text, 'test')
+ assert_equal(p.find(self.dc + 'title').text, 'Test Project')
+ assert_equal(p.find(self.ns_sf + 'private').text, '0')
+ assert_equal(p.find(self.ns + 'shortdesc').text, 'A Summary')
+ assert_equal(p.find(self.ns + 'description').text, 'A Short Description')
+ assert_equal(p.find(self.ns + 'created').text, project._id.generation_time.strftime('%Y-%m-%d'))
+
+ maintainers = p.findall(self.ns + 'maintainer')
+ assert_equal(len(maintainers), 1)
+ user = maintainers[0].find(self.foaf + 'Person')
+ assert_equal(user.find(self.foaf + 'name').text, 'Test Admin')
+ assert_equal(user.find(self.foaf + 'nick').text, 'test-admin')
+ assert_equal(list(user.find(self.foaf + 'homepage').items())[0][1],
+ 'http://localhost/u/test-admin/')
+
+ @td.with_tool('test', 'Tickets', 'bugs')
+ @td.with_tool('test', 'Tickets', 'private-bugs')
+ def test_project_data_tools(self):
+ # Deny anonymous to see 'private-bugs' tool
+ role = M.ProjectRole.by_name('*anonymous')._id
+ read_permission = M.ACE.allow(role, 'read')
+ app = M.Project.query.get(
+ shortname='test').app_instance('private-bugs')
+ if read_permission in app.config.acl:
+ app.config.acl.remove(read_permission)
+
+ # admin sees both 'Tickets' tools
+ r = self.app.get('/rest/p/test?doap')
+ p = r.xml.find(self.ns + 'Project')
+ tools = p.findall(self.ns_sf + 'feature')
+ tools = [(t.find(self.ns_sf + 'Feature').find(self.ns + 'name').text,
+ list(t.find(self.ns_sf + 'Feature').find(self.foaf + 'page').items())[0][1])
+ for t in tools]
+ assert_in(('Tickets', 'http://localhost/p/test/bugs/'), tools)
+ assert_in(('Tickets', 'http://localhost/p/test/private-bugs/'), tools)
+
+ # anonymous sees only non-private tool
+ r = self.app.get('/rest/p/test?doap',
+ extra_environ={'username': '*anonymous'})
+ p = r.xml.find(self.ns + 'Project')
+ tools = p.findall(self.ns_sf + 'feature')
+ tools = [(t.find(self.ns_sf + 'Feature').find(self.ns + 'name').text,
+ list(t.find(self.ns_sf + 'Feature').find(self.foaf + 'page').items())[0][1])
+ for t in tools]
+ assert_in(('Tickets', 'http://localhost/p/test/bugs/'), tools)
+ assert_not_in(('Tickets', 'http://localhost/p/test/private-bugs/'), tools)
+
+
+class TestUserProfile(TestRestApiBase):
+ @td.with_user_project('test-admin')
+ def test_profile_data(self):
+ r = self.app.get('/rest/u/test-admin/profile/')
+ assert_equal(r.content_type, 'application/json')
+ json = r.json
+ assert_equal(json['username'], 'test-admin')
+ assert_equal(json['name'], 'Test Admin')
+ assert_in('availability', json)
+ assert_in('joined', json)
+ assert_in('localization', json)
+ assert_in('projects', json)
+ assert_in('sex', json)
+ assert_in('skills', json)
+ assert_in('skypeaccount', json)
+ assert_in('socialnetworks', json)
+ assert_in('telnumbers', json)
+ assert_in('webpages', json)
http://git-wip-us.apache.org/repos/asf/allura/blob/d52f8e2a/tests/functional/test_root.py
----------------------------------------------------------------------
diff --git a/tests/functional/test_root.py b/tests/functional/test_root.py
new file mode 100644
index 0000000..e2aac1f
--- /dev/null
+++ b/tests/functional/test_root.py
@@ -0,0 +1,186 @@
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Functional test suite for the root controller.
+
+This is an example of how functional tests can be written for controllers.
+
+As opposed to a unit-test, which test a small unit of functionality,
+functional tests exercise the whole application and its WSGI stack.
+
+Please read http://pythonpaste.org/webtest/ for more information.
+
+"""
+from __future__ import print_function
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import unicode_literals
+from pylons import tmpl_context as c
+from nose.tools import assert_equal
+from ming.orm.ormsession import ThreadLocalORMSession
+import mock
+from IPython.testing.decorators import module_not_available, skipif
+
+from allura.tests import decorators as td
+from allura.tests import TestController
+from allura import model as M
+from allura.lib.helpers import push_config
+from alluratest.controller import setup_trove_categories
+
+
+class TestRootController(TestController):
+
+ def setUp(self):
+ super(TestRootController, self).setUp()
+ n_adobe = M.Neighborhood.query.get(name='Adobe')
+ assert n_adobe
+ u_admin = M.User.query.get(username='test-admin')
+ assert u_admin
+ n_adobe.register_project('adobe-2', u_admin)
+
+ def test_index(self):
+ response = self.app.get('/')
+ assert_equal(response.html.find('h2', {'class': 'dark title'}).contents[
+ 0].strip(), 'All Neighborhoods')
+ nbhds = response.html.findAll('td', {'class': 'nbhd'})
+ assert nbhds[0].find('a').get('href') == '/adobe/'
+ cat_links = response.html.find('div', {'id': 'sidebar'}).findAll('li')
+ assert len(cat_links) == 4
+ assert cat_links[0].find('a').get('href') == '/browse/clustering'
+ assert cat_links[0].find('a').find('span').string == 'Clustering'
+
+ def test_sidebar_escaping(self):
+ # use this as a convenient way to get something in the sidebar
+ M.ProjectCategory(name='test-xss', label='<script>alert(1)</script>')
+ ThreadLocalORMSession.flush_all()
+
+ response = self.app.get('/')
+ # inject it into the sidebar data
+ content = str(response.html.find('div', {'id': 'content_base'}))
+ assert '<script>' not in content
+ assert '<script>' in content
+
+ def test_strange_accept_headers(self):
+ hdrs = [
+ 'text/plain;text/html;text/*',
+ 'text/html,application/xhtml+xml,application/xml;q=0.9;text/plain;q=0.8,image/png,*/*;q=0.5']
+ for hdr in hdrs:
+ # malformed headers used to return 500, just make sure they don't
+ # now
+ self.app.get('/', headers=dict(Accept=hdr), validate_skip=True)
+
+ def test_project_browse(self):
+ com_cat = M.ProjectCategory.query.find(
+ dict(label='Communications')).first()
+ M.Project.query.find(dict(shortname='adobe-1')
+ ).first().category_id = com_cat._id
+ response = self.app.get('/browse')
+ assert len(
+ response.html.findAll('a', {'href': '/adobe/adobe-1/'})) == 1
+ assert len(
+ response.html.findAll('a', {'href': '/adobe/adobe-2/'})) == 1
+ response = self.app.get('/browse/communications')
+ assert len(
+ response.html.findAll('a', {'href': '/adobe/adobe-1/'})) == 1
+ assert len(
+ response.html.findAll('a', {'href': '/adobe/adobe-2/'})) == 0
+ response = self.app.get('/browse/communications/fax')
+ assert len(
+ response.html.findAll('a', {'href': '/adobe/adobe-1/'})) == 0
+ assert len(
+ response.html.findAll('a', {'href': '/adobe/adobe-2/'})) == 0
+
+ def test_neighborhood_home(self):
+ setup_trove_categories()
+ # Install home app
+ nb = M.Neighborhood.query.get(name='Adobe')
+ p = nb.neighborhood_project
+ with push_config(c, user=M.User.query.get(username='test-admin')):
+ p.install_app('home', 'home', 'Home', ordinal=0)
+
+ response = self.app.get('/adobe/')
+ projects = response.html.findAll('div', {'class': 'border card'})
+ assert len(projects) == 2
+ cat_links = response.html.find(
+ 'div', {'id': 'sidebar'}).findAll('ul')[1].findAll('li')
+ assert len(cat_links) == 3, cat_links
+ assert cat_links[0].find('a').get('href') == '/adobe/browse/clustering'
+ assert cat_links[0].find('a').find('span').string == 'Clustering'
+
+ def test_neighborhood_project_browse(self):
+ com_cat = M.ProjectCategory.query.find(
+ dict(label='Communications')).first()
+ fax_cat = M.ProjectCategory.query.find(dict(label='Fax')).first()
+ M.Project.query.find(dict(shortname='adobe-1')
+ ).first().category_id = com_cat._id
+ M.Project.query.find(dict(shortname='adobe-2')
+ ).first().category_id = fax_cat._id
+ response = self.app.get('/adobe/browse')
+ assert len(
+ response.html.findAll('a', {'href': '/adobe/adobe-1/'})) == 1
+ assert len(
+ response.html.findAll('a', {'href': '/adobe/adobe-2/'})) == 1
+ response = self.app.get('/adobe/browse/communications')
+ assert len(
+ response.html.findAll('a', {'href': '/adobe/adobe-1/'})) == 1
+ assert len(
+ response.html.findAll('a', {'href': '/adobe/adobe-2/'})) == 1
+ response = self.app.get('/adobe/browse/communications/fax')
+ assert len(
+ response.html.findAll('a', {'href': '/adobe/adobe-1/'})) == 0
+ assert len(
+ response.html.findAll('a', {'href': '/adobe/adobe-2/'})) == 1
+
+ @td.with_wiki
+ def test_markdown_to_html(self):
+ n = M.Neighborhood.query.get(name='Projects')
+ r = self.app.get(
+ '/nf/markdown_to_html?markdown=*aaa*bb[wiki:Home]&project=test&app=bugs&neighborhood=%s' % n._id, validate_chunk=True)
+ assert '<p><em>aaa</em>bb<a class="alink" href="/p/test/wiki/Home">[wiki:Home]</a></p>' in r, r
+
+ def test_slash_redirect(self):
+ self.app.get('/p', status=301)
+ self.app.get('/p/', status=302)
+
+ @skipif(module_not_available('newrelic'))
+ def test_newrelic_set_transaction_name(self):
+ from allura.controllers.project import NeighborhoodController
+ with mock.patch('newrelic.agent.callable_name') as callable_name,\
+ mock.patch('newrelic.agent.set_transaction_name') as set_transaction_name:
+ callable_name.return_value = 'foo'
+ self.app.get('/p/')
+ arg = callable_name.call_args[0][0]
+ assert_equal(arg.undecorated,
+ NeighborhoodController.index.undecorated)
+ set_transaction_name.assert_called_with('foo')
+
+
+class TestRootWithSSLPattern(TestController):
+ def setUp(self):
+ with td.patch_middleware_config({'force_ssl.pattern': '^/auth'}):
+ super(TestRootWithSSLPattern, self).setUp()
+
+ def test_no_weird_ssl_redirect_for_error_document(self):
+ # test a 404, same functionality as a 500 from an error
+ r = self.app.get('/auth/asdfasdf',
+ extra_environ={'wsgi.url_scheme': 'https'},
+ status=404)
+ assert '302 Found' not in r.body, r.body
+ assert '/error/document' not in r.body, r.body
http://git-wip-us.apache.org/repos/asf/allura/blob/d52f8e2a/tests/functional/test_search.py
----------------------------------------------------------------------
diff --git a/tests/functional/test_search.py b/tests/functional/test_search.py
new file mode 100644
index 0000000..50c0cfb
--- /dev/null
+++ b/tests/functional/test_search.py
@@ -0,0 +1,37 @@
+from __future__ import print_function
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import unicode_literals
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from mock import patch
+from allura.tests import TestController
+
+
+class TestSearch(TestController):
+
+ @patch('allura.lib.search.search')
+ def test_global_search_controller(self, search):
+ self.app.get('/gsearch/')
+ assert not search.called, search.called
+ self.app.get('/gsearch/', params=dict(q='Root'))
+ assert search.called, search.called
+
+ def test_project_search_controller(self):
+ self.app.get('/search/')
+ self.app.get('/search/', params=dict(q='Root'))
http://git-wip-us.apache.org/repos/asf/allura/blob/d52f8e2a/tests/functional/test_site_admin.py
----------------------------------------------------------------------
diff --git a/tests/functional/test_site_admin.py b/tests/functional/test_site_admin.py
new file mode 100644
index 0000000..70436e3
--- /dev/null
+++ b/tests/functional/test_site_admin.py
@@ -0,0 +1,575 @@
+# coding: utf-8
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from __future__ import print_function
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import unicode_literals
+import json
+import datetime as dt
+
+from mock import patch, MagicMock
+from nose.tools import assert_equal, assert_in, assert_not_in
+from ming.odm import ThreadLocalORMSession
+from pylons import tmpl_context as c
+from tg import config
+from bson import ObjectId
+
+from allura import model as M
+from allura.tests import TestController
+from allura.tests import decorators as td
+from allura.lib import helpers as h
+from allura.lib.decorators import task
+from allura.lib.plugin import LocalAuthenticationProvider
+
+
+class TestSiteAdmin(TestController):
+
+ def test_access(self):
+ r = self.app.get('/nf/admin/', extra_environ=dict(
+ username='test-user'), status=403)
+
+ r = self.app.get('/nf/admin/', extra_environ=dict(
+ username='*anonymous'), status=302)
+ r = r.follow()
+ assert 'Login' in r
+
+ def test_home(self):
+ r = self.app.get('/nf/admin/', extra_environ=dict(
+ username='root'))
+ assert 'Site Admin Home' in r
+
+ def test_stats(self):
+ r = self.app.get('/nf/admin/stats/', extra_environ=dict(
+ username='root'))
+ assert 'Forge Site Admin' in r.html.find(
+ 'h2', {'class': 'dark title'}).contents[0]
+ stats_table = r.html.find('table')
+ cells = stats_table.findAll('td')
+ assert cells[0].contents[0] == 'Adobe', cells[0].contents[0]
+
+ def test_tickets_access(self):
+ self.app.get('/nf/admin/api_tickets', extra_environ=dict(
+ username='test-user'), status=403)
+
+ def test_new_projects_access(self):
+ self.app.get('/nf/admin/new_projects', extra_environ=dict(
+ username='test_user'), status=403)
+ r = self.app.get('/nf/admin/new_projects', extra_environ=dict(
+ username='*anonymous'), status=302).follow()
+ assert 'Login' in r
+
+ def test_new_projects(self):
+ r = self.app.get('/nf/admin/new_projects', extra_environ=dict(
+ username='root'))
+ headers = r.html.find('table').findAll('th')
+ assert headers[1].contents[0] == 'Created'
+ assert headers[2].contents[0] == 'Shortname'
+ assert headers[3].contents[0] == 'Name'
+ assert headers[4].contents[0] == 'Short description'
+ assert headers[5].contents[0] == 'Summary'
+ assert headers[6].contents[0] == 'Homepage'
+ assert headers[7].contents[0] == 'Admins'
+
+ def test_new_projects_deleted_projects(self):
+ '''Deleted projects should not be visible here'''
+ r = self.app.get('/nf/admin/new_projects', extra_environ=dict(
+ username='root'))
+ count = len(r.html.find('table').findAll('tr'))
+ p = M.Project.query.get(shortname='test')
+ p.deleted = True
+ ThreadLocalORMSession.flush_all()
+ r = self.app.get('/nf/admin/new_projects', extra_environ=dict(
+ username='root'))
+ assert_equal(len(r.html.find('table').findAll('tr')), count - 1)
+
+ def test_new_projects_daterange_filtering(self):
+ r = self.app.get('/nf/admin/new_projects', extra_environ=dict(
+ username='root'))
+ count = len(r.html.find('table').findAll('tr'))
+ assert_equal(count, 7)
+
+ filtr = r.forms[0]
+ filtr['start-dt'] = '2000/01/01 10:10:10'
+ filtr['end-dt'] = '2000/01/01 09:09:09'
+ r = filtr.submit()
+ count = len(r.html.find('table').findAll('tr'))
+ assert_equal(count, 1) # only row with headers - no results
+
+ def test_reclone_repo_access(self):
+ r = self.app.get('/nf/admin/reclone_repo', extra_environ=dict(
+ username='*anonymous'), status=302).follow()
+ assert 'Login' in r
+
+ def test_reclone_repo(self):
+ r = self.app.get('/nf/admin/reclone_repo')
+ assert 'Reclone repository' in r
+
+ def test_reclone_repo_default_value(self):
+ r = self.app.get('/nf/admin/reclone_repo')
+ assert 'value="p"' in r
+
+ def test_task_list(self):
+ r = self.app.get('/nf/admin/task_manager',
+ extra_environ=dict(username='*anonymous'), status=302)
+ import math
+ M.MonQTask.post(math.ceil, (12.5,))
+ r = self.app.get('/nf/admin/task_manager?page_num=1')
+ assert 'math.ceil' in r, r
+
+ def test_task_view(self):
+ import math
+ task = M.MonQTask.post(math.ceil, (12.5,))
+ url = '/nf/admin/task_manager/view/%s' % task._id
+ r = self.app.get(
+ url, extra_environ=dict(username='*anonymous'), status=302)
+ r = self.app.get(url)
+ assert 'math.ceil' in r, r
+
+ def test_task_new(self):
+ r = self.app.get('/nf/admin/task_manager/new')
+ assert 'New Task' in r, r
+
+ def test_task_create(self):
+ project = M.Project.query.get(shortname='test')
+ app = project.app_instance('admin')
+ user = M.User.by_username('root')
+
+ task_args = dict(
+ args=['foo'],
+ kwargs=dict(bar='baz'))
+
+ r = self.app.post('/nf/admin/task_manager/create', params=dict(
+ task='allura.tests.functional.test_site_admin.test_task',
+ task_args=json.dumps(task_args),
+ user='root',
+ path='/p/test/admin',
+ ), status=302)
+ task = next(M.MonQTask.query.find({}).sort('_id', -1))
+ assert str(task._id) in r.location
+ assert task.context['project_id'] == project._id
+ assert task.context['app_config_id'] == app.config._id
+ assert task.context['user_id'] == user._id
+ assert task.args == task_args['args']
+ assert task.kwargs == task_args['kwargs']
+
+ def test_task_doc(self):
+ r = self.app.get('/nf/admin/task_manager/task_doc', params=dict(
+ task_name='allura.tests.functional.test_site_admin.test_task'))
+ assert json.loads(r.body)['doc'] == 'test_task doc string'
+
+
+class TestProjectsSearch(TestController):
+
+ TEST_HIT = MagicMock(hits=1, docs=[{
+ 'name_s': 'Test Project',
+ 'is_nbhd_project_b': False,
+ 'is_root_b': True,
+ 'title': ['Project Test Project'],
+ 'deleted_b': False,
+ 'shortname_s': 'test',
+ 'private_b': False,
+ 'url_s': 'http://localhost:8080/p/test/',
+ 'neighborhood_id_s': '53ccf6e6100d2b0741746c66',
+ 'removal_changed_date_dt': '2014-07-21T11:18:00.087Z',
+ 'registration_dt': '2014-07-21T11:18:00Z',
+ 'type_s': 'Project',
+ '_version_': 1474236502200287232,
+ 'neighborhood_name_s': 'Projects',
+ 'id': 'allura/model/project/Project#53ccf6e8100d2b0741746e9f',
+ }])
+
+ def setUp(self):
+ super(TestProjectsSearch, self).setUp()
+ # Create project that matches TEST_HIT id
+ _id = ObjectId('53ccf6e8100d2b0741746e9f')
+ p = M.Project.query.get(_id=_id)
+ if not p:
+ M.Project(
+ _id=_id,
+ neighborhood_id=M.Neighborhood.query.get(url_prefix='/u/')._id,
+ shortname='test-project',
+ )
+ ThreadLocalORMSession().flush_all()
+
+ @patch('allura.controllers.site_admin.search')
+ def test_default_fields(self, search):
+ search.site_admin_search.return_value = self.TEST_HIT
+ r = self.app.get('/nf/admin/search_projects?q=fake&f=shortname')
+ options = [o['value'] for o in r.html.findAll('option')]
+ assert_equal(options, ['shortname', 'name', '__custom__'])
+ ths = [th.text for th in r.html.findAll('th')]
+ assert_equal(ths, ['Short name', 'Full name', 'Registered', 'Deleted?', 'Details'])
+
+ @patch('allura.controllers.site_admin.search')
+ def test_additional_fields(self, search):
+ search.site_admin_search.return_value = self.TEST_HIT
+ with h.push_config(config, **{'search.project.additional_search_fields': 'private, url',
+ 'search.project.additional_display_fields': 'url'}):
+ r = self.app.get('/nf/admin/search_projects?q=fake&f=shortname')
+ options = [o['value'] for o in r.html.findAll('option')]
+ assert_equal(options, ['shortname', 'name', 'private', 'url', '__custom__'])
+ ths = [th.text for th in r.html.findAll('th')]
+ assert_equal(ths, ['Short name', 'Full name', 'Registered', 'Deleted?', 'url', 'Details'])
+
+
+class TestUsersSearch(TestController):
+
+ TEST_HIT = MagicMock(hits=1, docs=[{
+ '_version_': 1478773871277506560,
+ 'disabled_b': False,
+ 'pending_b': False,
+ 'display_name_t': 'Darth Vader',
+ 'id': 'allura/model/auth/User#540efdf2100d2b1483155d39',
+ 'last_access_login_date_dt': '2014-09-09T13:17:40.176Z',
+ 'last_access_login_ip_s': '10.0.2.2',
+ 'last_access_login_ua_t': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36',
+ 'last_access_session_date_dt': '2014-09-09T13:17:40.33Z',
+ 'last_access_session_ip_s': '10.0.2.2',
+ 'last_access_session_ua_t': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.94 Safari/537.36',
+ 'last_password_updated_dt': '2014-09-09T13:17:38.857Z',
+ 'localization_s': 'None/None',
+ 'sex_s': 'Unknown',
+ 'title': ['User darth'],
+ 'type_s': 'User',
+ 'url_s': '/u/darth/',
+ 'user_registration_date_dt': '2014-09-09T13:17:38Z',
+ 'username_s': 'darth'}])
+
+ def setUp(self):
+ super(TestUsersSearch, self).setUp()
+ # Create user that matches TEST_HIT id
+ _id = ObjectId('540efdf2100d2b1483155d39')
+ u = M.User.query.get(_id=_id)
+ if not u:
+ M.User(_id=_id, username='darth')
+ ThreadLocalORMSession().flush_all()
+
+ @patch('allura.controllers.site_admin.search')
+ def test_default_fields(self, search):
+ search.site_admin_search.return_value = self.TEST_HIT
+ r = self.app.get('/nf/admin/search_users?q=fake&f=username')
+ options = [o['value'] for o in r.html.findAll('option')]
+ assert_equal(options, ['username', 'display_name', '__custom__'])
+ ths = [th.text for th in r.html.findAll('th')]
+ assert_equal(ths, ['Username', 'Display name', 'Email', 'Registered',
+ 'Status', 'Details'])
+
+ @patch('allura.controllers.site_admin.search')
+ def test_additional_fields(self, search):
+ search.site_admin_search.return_value = self.TEST_HIT
+ with h.push_config(config, **{'search.user.additional_search_fields': 'email_addresses, url',
+ 'search.user.additional_display_fields': 'url'}):
+ r = self.app.get('/nf/admin/search_users?q=fake&f=username')
+ options = [o['value'] for o in r.html.findAll('option')]
+ assert_equal(options, ['username', 'display_name', 'email_addresses', 'url', '__custom__'])
+ ths = [th.text for th in r.html.findAll('th')]
+ assert_equal(ths, ['Username', 'Display name', 'Email', 'Registered',
+ 'Status', 'url', 'Details'])
+
+
+class TestUserDetails(TestController):
+
+ def test_404(self):
+ self.app.get('/nf/admin/user/does-not-exist/', status=404)
+
+ def test_general_info(self):
+ user = M.User.by_username('test-admin')
+ user.registration_date = lambda: dt.datetime(2014, 9, 1, 9, 9, 9)
+ user.last_access = {'login_date': dt.datetime(2014, 9, 2, 6, 6, 6),
+ 'login_ua': 'browser of the future 1.0',
+ 'login_ip': '8.8.8.8',
+ 'session_date': dt.datetime(2014, 9, 12, 6, 6, 6),
+ 'session_ua': 'browser of the future 1.1',
+ 'session_ip': '7.7.7.7'}
+ r = self.app.get('/nf/admin/user/test-admin/')
+ # general info
+ assert_in('Username: test-admin', r)
+ assert_in('Full name: Test Admin', r)
+ assert_in('Registered: 2014-09-01 09:09:09', r)
+ # session info
+ assert_in('Date: 2014-09-02 06:06:06', r)
+ assert_in('IP: 8.8.8.8', r)
+ assert_in('UA: browser of the future 1.0', r)
+ assert_in('Date: 2014-09-12', r)
+ assert_in('IP: 7.7.7.7', r)
+ assert_in('UA: browser of the future 1.1', r)
+ # list of projects
+ projects = r.html.findAll('fieldset')[-1]
+ projects = [e.getText() for e in projects.findAll('li')]
+ assert_in('Test 2–Admin', projects)
+ assert_in('Test Project–Admin', projects)
+ assert_in('Adobe project 1–Admin', projects)
+
+ @patch('allura.model.auth.request')
+ @patch('allura.lib.helpers.request')
+ def test_audit_log(self, req1, req2):
+ req1.url = req2.url = 'http://host.domain/path/'
+ c.user = M.User.by_username('test-user-1')
+ h.auditlog_user('test activity user 1')
+ h.auditlog_user('test activity user 2', user=M.User.by_username('test-user-2'))
+ r = self.app.get('/nf/admin/user/test-admin')
+ assert_in('Add comment', r)
+ assert_not_in('test activity', r)
+ r = self.app.get('/nf/admin/user/test-user-1')
+ assert_in('test activity user 1', r)
+ assert_not_in('test activity user 2', r)
+ r = self.app.get('/nf/admin/user/test-user-2')
+ assert_not_in('test activity user 1', r)
+ assert_in('test activity user 2', r)
+
+ def test_add_audit_trail_entry_access(self):
+ self.app.get('/nf/admin/user/add_audit_log_entry', status=404) # GET is not allowed
+ r = self.app.post('/nf/admin/user/add_audit_log_entry',
+ extra_environ={'username': '*anonymous'},
+ status=302)
+ assert_equal(r.location, 'http://localhost/auth/')
+
+ def test_add_comment(self):
+ r = self.app.get('/nf/admin/user/test-user')
+ assert_not_in('Comment by test-admin: I was hêre!', r)
+ form = r.forms[4]
+ assert_equal(form['username'].value, 'test-user')
+ form['comment'] = 'I was hêre!'
+ r = form.submit()
+ assert_in('Comment added', self.webflash(r))
+ r = self.app.get('/nf/admin/user/test-user')
+ assert_in('Comment by test-admin: I was hêre!', r)
+
+ def test_disable_user(self):
+ # user was not pending
+ assert_equal(M.User.by_username('test-user-3').disabled, False)
+ assert_equal(M.User.by_username('test-user-3').pending, False)
+ r = self.app.get('/nf/admin/user/test-user-3')
+ form = r.forms[0]
+ assert_equal(form['username'].value, 'test-user-3')
+ assert_equal(form['status'].value, 'enable')
+ form['status'].value = 'disable'
+ with td.audits('Account disabled', user=True):
+ r = form.submit()
+ assert_equal(M.AuditLog.query.find().count(), 1)
+ assert_in('User disabled', self.webflash(r))
+ assert_equal(M.User.by_username('test-user-3').disabled, True)
+ assert_equal(M.User.by_username('test-user-3').pending, False)
+
+ # user was pending
+ user = M.User.by_username('test-user-3')
+ user.disabled = False
+ user.pending = True
+ ThreadLocalORMSession.flush_all()
+ assert_equal(M.User.by_username('test-user-3').disabled, False)
+ assert_equal(M.User.by_username('test-user-3').pending, True)
+ r = self.app.get('/nf/admin/user/test-user-3')
+ form = r.forms[0]
+ assert_equal(form['username'].value, 'test-user-3')
+ assert_equal(form['status'].value, 'pending')
+ form['status'].value = 'disable'
+ with td.audits('Account disabled', user=True):
+ r = form.submit()
+ assert_equal(M.AuditLog.query.find().count(), 1)
+ assert_in('User disabled', self.webflash(r))
+ assert_equal(M.User.by_username('test-user-3').disabled, True)
+ assert_equal(M.User.by_username('test-user-3').pending, True)
+
+ def test_enable_user(self):
+ # user was not pending
+ user = M.User.by_username('test-user-3')
+ user.disabled = True
+ ThreadLocalORMSession.flush_all()
+ assert_equal(M.User.by_username('test-user-3').disabled, True)
+ assert_equal(M.User.by_username('test-user-3').pending, False)
+ r = self.app.get('/nf/admin/user/test-user-3')
+ form = r.forms[0]
+ assert_equal(form['username'].value, 'test-user-3')
+ assert_equal(form['status'].value, 'disable')
+ form['status'].value = 'enable'
+ with td.audits('Account enabled', user=True):
+ r = form.submit()
+ assert_equal(M.AuditLog.query.find().count(), 1)
+ assert_in('User enabled', self.webflash(r))
+ assert_equal(M.User.by_username('test-user-3').disabled, False)
+ assert_equal(M.User.by_username('test-user-3').pending, False)
+
+ # user was pending
+ user = M.User.by_username('test-user-3')
+ user.disabled = False
+ user.pending = True
+ ThreadLocalORMSession.flush_all()
+ assert_equal(M.User.by_username('test-user-3').disabled, False)
+ assert_equal(M.User.by_username('test-user-3').pending, True)
+ r = self.app.get('/nf/admin/user/test-user-3')
+ form = r.forms[0]
+ assert_equal(form['username'].value, 'test-user-3')
+ assert_equal(form['status'].value, 'pending')
+ form['status'].value = 'enable'
+ with td.audits('Account enabled', user=True):
+ r = form.submit()
+ assert_equal(M.AuditLog.query.find().count(), 1)
+ assert_in('User enabled', self.webflash(r))
+ assert_equal(M.User.by_username('test-user-3').disabled, False)
+ assert_equal(M.User.by_username('test-user-3').pending, False)
+
+ # user was pending and disabled
+ user = M.User.by_username('test-user-3')
+ user.disabled = True
+ user.pending = True
+ ThreadLocalORMSession.flush_all()
+ assert_equal(M.User.by_username('test-user-3').disabled, True)
+ assert_equal(M.User.by_username('test-user-3').pending, True)
+ r = self.app.get('/nf/admin/user/test-user-3')
+ form = r.forms[0]
+ assert_equal(form['username'].value, 'test-user-3')
+ assert_equal(form['status'].value, 'disable')
+ form['status'].value = 'enable'
+ with td.audits('Account enabled', user=True):
+ r = form.submit()
+ assert_equal(M.AuditLog.query.find().count(), 1)
+ assert_in('User enabled', self.webflash(r))
+ assert_equal(M.User.by_username('test-user-3').disabled, False)
+ assert_equal(M.User.by_username('test-user-3').pending, False)
+
+ def test_set_pending(self):
+ # user was disabled
+ user = M.User.by_username('test-user-3')
+ user.disabled = True
+ ThreadLocalORMSession.flush_all()
+ assert_equal(M.User.by_username('test-user-3').disabled, True)
+ assert_equal(M.User.by_username('test-user-3').pending, False)
+ r = self.app.get('/nf/admin/user/test-user-3')
+ form = r.forms[0]
+ assert_equal(form['username'].value, 'test-user-3')
+ assert_equal(form['status'].value, 'disable')
+ form['status'].value = 'pending'
+ with td.audits('Account changed to pending', user=True):
+ r = form.submit()
+ assert_equal(M.AuditLog.query.find().count(), 1)
+ assert_in('Set user status to pending', self.webflash(r))
+ assert_equal(M.User.by_username('test-user-3').disabled, False)
+ assert_equal(M.User.by_username('test-user-3').pending, True)
+
+ # user was enabled
+ user = M.User.by_username('test-user-3')
+ user.pending = False
+ user.disabled = False
+ ThreadLocalORMSession.flush_all()
+ assert_equal(M.User.by_username('test-user-3').disabled, False)
+ assert_equal(M.User.by_username('test-user-3').pending, False)
+ r = self.app.get('/nf/admin/user/test-user-3')
+ form = r.forms[0]
+ assert_equal(form['username'].value, 'test-user-3')
+ assert_equal(form['status'].value, 'enable')
+ form['status'].value = 'pending'
+ with td.audits('Account changed to pending', user=True):
+ r = form.submit()
+ assert_equal(M.AuditLog.query.find().count(), 1)
+ assert_in('Set user status to pending', self.webflash(r))
+ assert_equal(M.User.by_username('test-user-3').disabled, False)
+ assert_equal(M.User.by_username('test-user-3').pending, True)
+
+ def test_emails(self):
+ # add test@example.com
+ with td.audits('New email address: test@example.com', user=True):
+ r = self.app.post('/nf/admin/user/update_emails', params={
+ 'username': 'test-user',
+ 'new_addr.addr': 'test@example.com',
+ 'new_addr.claim': 'Claim Address',
+ 'primary_addr': 'test@example.com'},
+ extra_environ=dict(username='test-admin'))
+ r = self.app.get('/nf/admin/user/test-user')
+ assert_in('test@example.com', r)
+ em = M.EmailAddress.get(email='test@example.com')
+ assert_equal(em.confirmed, True)
+ user = M.User.query.get(username='test-user')
+ assert_equal(user.get_pref('email_address'), 'test@example.com')
+
+ # add test2@example.com
+ with td.audits('New email address: test2@example.com', user=True):
+ r = self.app.post('/nf/admin/user/update_emails', params={
+ 'username': 'test-user',
+ 'new_addr.addr': 'test2@example.com',
+ 'new_addr.claim': 'Claim Address',
+ 'primary_addr': 'test@example.com'},
+ extra_environ=dict(username='test-admin'))
+ r = self.app.get('/nf/admin/user/test-user')
+ assert_in('test2@example.com', r)
+ em = M.EmailAddress.get(email='test2@example.com')
+ assert_equal(em.confirmed, True)
+ user = M.User.query.get(username='test-user')
+ assert_equal(user.get_pref('email_address'), 'test@example.com')
+
+ # change primary: test -> test2
+ with td.audits('Primary email changed: test@example.com => test2@example.com', user=True):
+ r = self.app.post('/nf/admin/user/update_emails', params={
+ 'username': 'test-user',
+ 'new_addr.addr': '',
+ 'primary_addr': 'test2@example.com'},
+ extra_environ=dict(username='test-admin'))
+ r = self.app.get('/nf/admin/user/test-user')
+ user = M.User.query.get(username='test-user')
+ assert_equal(user.get_pref('email_address'), 'test2@example.com')
+
+ # remove test2@example.com
+ with td.audits('Email address deleted: test2@example.com', user=True):
+ r = self.app.post('/nf/admin/user/update_emails', params={
+ 'username': 'test-user',
+ 'addr-1.ord': '1',
+ 'addr-2.ord': '2',
+ 'addr-2.delete': 'on',
+ 'new_addr.addr': '',
+ 'primary_addr': 'test2@example.com'},
+ extra_environ=dict(username='test-admin'))
+ r = self.app.get('/nf/admin/user/test-user')
+ user = M.User.query.get(username='test-user')
+ # test@example.com set as primary since test2@example.com is deleted
+ assert_equal(user.get_pref('email_address'), 'test@example.com')
+
+ @patch.object(LocalAuthenticationProvider, 'set_password')
+ def test_set_random_password(self, set_password):
+ with td.audits('Set random password', user=True, actor='test-admin'):
+ r = self.app.post('/nf/admin/user/set_random_password', params={'username': 'test-user'})
+ assert_in('Password is set', self.webflash(r))
+ set_password.assert_called_once()
+
+ @patch('allura.tasks.mail_tasks.sendsimplemail')
+ @patch('allura.lib.helpers.gen_message_id')
+ def test_send_password_reset_link(self, gen_message_id, sendmail):
+ user = M.User.by_username('test-user')
+ user.set_pref('email_address', 'test-user@example.org')
+ M.EmailAddress(email='test-user@example.org', confirmed=True, claimed_by_user_id=user._id)
+ ThreadLocalORMSession.flush_all()
+ with td.audits('Password recovery link sent to: test-user@example.org', user=True):
+ r = self.app.post('/nf/admin/user/send_password_reset_link', params={'username': 'test-user'})
+ hash = user.get_tool_data('AuthPasswordReset', 'hash')
+ text = '''Your username is test-user
+
+To reset your password on %s, please visit the following URL:
+
+%s/auth/forgotten_password/%s''' % (config['site_name'], config['base_url'], hash)
+ sendmail.post.assert_called_once_with(
+ toaddr='test-user@example.org',
+ fromaddr=config['forgemail.return_path'],
+ reply_to=config['forgemail.return_path'],
+ subject='Allura Password recovery',
+ message_id=gen_message_id(),
+ text=text)
+
+
+@task
+def test_task(*args, **kw):
+ """test_task doc string"""
+ pass
http://git-wip-us.apache.org/repos/asf/allura/blob/d52f8e2a/tests/functional/test_static.py
----------------------------------------------------------------------
diff --git a/tests/functional/test_static.py b/tests/functional/test_static.py
new file mode 100644
index 0000000..5a69189
--- /dev/null
+++ b/tests/functional/test_static.py
@@ -0,0 +1,30 @@
+from __future__ import print_function
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import unicode_literals
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from allura.tests import TestController
+
+
+class TestStatic(TestController):
+
+ def test_static_controller(self):
+ self.app.get('/nf/_static_/wiki/js/browse.js')
+ self.app.get('/nf/_static_/wiki/js/no_such_file.js', status=404)
+ self.app.get('/nf/_static_/no_such_tool/js/comments.js', status=404)
http://git-wip-us.apache.org/repos/asf/allura/blob/d52f8e2a/tests/functional/test_subscriber.py
----------------------------------------------------------------------
diff --git a/tests/functional/test_subscriber.py b/tests/functional/test_subscriber.py
new file mode 100644
index 0000000..67c2e39
--- /dev/null
+++ b/tests/functional/test_subscriber.py
@@ -0,0 +1,43 @@
+from __future__ import print_function
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import unicode_literals
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from allura.tests import TestController
+from allura.tests import decorators as td
+from allura.model.notification import Mailbox
+from allura import model as M
+
+
+class TestSubscriber(TestController):
+
+ @td.with_user_project('test-admin')
+ @td.with_wiki
+ def test_add_subscriber(self):
+
+ response = self.app.get("/nf/admin/add_subscribers")
+ assert "<h1>Add Subscribers to Artifact</h1>" in response
+
+ self.app.post("/nf/admin/add_subscribers", params=dict(
+ for_user="root",
+ artifact_url="http://localhost:8080/u/test-admin/wiki/Home/"))
+
+ assert 1 == Mailbox.query.find(dict(
+ user_id=M.User.by_username("root")._id,
+ artifact_url="/u/test-admin/wiki/Home/")).count()
http://git-wip-us.apache.org/repos/asf/allura/blob/d52f8e2a/tests/functional/test_tool_list.py
----------------------------------------------------------------------
diff --git a/tests/functional/test_tool_list.py b/tests/functional/test_tool_list.py
new file mode 100644
index 0000000..44e83e5
--- /dev/null
+++ b/tests/functional/test_tool_list.py
@@ -0,0 +1,48 @@
+from __future__ import print_function
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import unicode_literals
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from allura.tests import TestController
+from allura.tests import decorators as td
+
+
+class TestToolListController(TestController):
+
+ @td.with_wiki
+ @td.with_tool('test', 'Wiki', 'wiki2')
+ def test_default(self):
+ """Test that list page contains a link to all tools of that type."""
+ r = self.app.get('/p/test/_list/wiki')
+ content = r.html.find('div', id='content_base')
+ assert content.find('a', dict(href='/p/test/wiki/')), r
+ assert content.find('a', dict(href='/p/test/wiki2/')), r
+
+ @td.with_wiki
+ @td.with_tool('test', 'Wiki', 'wiki2')
+ def test_paging(self):
+ """Test that list page handles paging correctly."""
+ r = self.app.get('/p/test/_list/wiki?limit=1&page=0')
+ content = r.html.find('div', id='content_base')
+ assert content.find('a', dict(href='/p/test/wiki/')), r
+ assert not content.find('a', dict(href='/p/test/wiki2/')), r
+ r = self.app.get('/p/test/_list/wiki?limit=1&page=1')
+ content = r.html.find('div', id='content_base')
+ assert not content.find('a', dict(href='/p/test/wiki/')), r
+ assert content.find('a', dict(href='/p/test/wiki2/')), r
http://git-wip-us.apache.org/repos/asf/allura/blob/d52f8e2a/tests/functional/test_trovecategory.py
----------------------------------------------------------------------
diff --git a/tests/functional/test_trovecategory.py b/tests/functional/test_trovecategory.py
new file mode 100644
index 0000000..3284711
--- /dev/null
+++ b/tests/functional/test_trovecategory.py
@@ -0,0 +1,124 @@
+from __future__ import print_function
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import unicode_literals
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+from BeautifulSoup import BeautifulSoup
+import mock
+
+from tg import config
+from nose.tools import assert_equals, assert_true
+from ming.orm import session
+
+from allura import model as M
+from allura.lib import helpers as h
+from allura.tests import TestController
+from alluratest.controller import setup_trove_categories
+from allura.tests import decorators as td
+
+
+class TestTroveCategory(TestController):
+ @mock.patch('allura.model.project.g.post_event')
+ def test_events(self, post_event):
+ setup_trove_categories()
+
+ # Create event
+ cfg = {'trovecategories.enableediting': 'true'}
+ with h.push_config(config, **cfg):
+ r = self.app.post('/categories/create/', params=dict(categoryname='test'))
+
+ category_id = post_event.call_args[0][1]
+ assert_true(isinstance(category_id, int))
+ assert_equals(post_event.call_args[0][0], 'trove_category_created')
+ category = M.TroveCategory.query.get(trove_cat_id=category_id)
+
+ # Update event
+ category.fullname = 'test2'
+ session(M.TroveCategory).flush()
+ edited_category_id = post_event.call_args[0][1]
+ assert_true(isinstance(edited_category_id, int))
+ assert_equals(edited_category_id, category_id)
+ assert_equals(post_event.call_args[0][0], 'trove_category_updated')
+
+ # Delete event
+ M.TroveCategory.delete(category)
+ session(M.TroveCategory).flush()
+ deleted_category_id = post_event.call_args[0][1]
+ assert_true(isinstance(deleted_category_id, int))
+ assert_equals(deleted_category_id, category_id)
+ assert_equals(post_event.call_args[0][0], 'trove_category_deleted')
+
+ def test_enableediting_setting(self):
+ def check_access(username=None, status=None):
+ self.app.get('/categories/', status=status,
+ extra_environ=dict(username=username))
+
+ cfg = {'trovecategories.enableediting': 'true'}
+
+ with h.push_config(config, **cfg):
+ check_access(username='test-user', status=200)
+ check_access(username='root', status=200)
+
+ cfg['trovecategories.enableediting'] = 'false'
+ with h.push_config(config, **cfg):
+ check_access(username='test-user', status=403)
+ check_access(username='root', status=403)
+
+ cfg['trovecategories.enableediting'] = 'admin'
+ with h.push_config(config, **cfg):
+ check_access(username='test-user', status=403)
+ check_access(username='root', status=200)
+
+
+class TestTroveCategoryController(TestController):
+ @td.with_tool('test2', 'admin_main', 'admin')
+ def test_trove_hierarchy(self):
+ root_parent = M.TroveCategory(fullname="Root", trove_cat_id=1, trove_parent_id=0)
+ category_a = M.TroveCategory(fullname="CategoryA", trove_cat_id=2, trove_parent_id=1)
+ category_b = M.TroveCategory(fullname="CategoryB", trove_cat_id=3, trove_parent_id=1)
+ child_a = M.TroveCategory(fullname="ChildA", trove_cat_id=4, trove_parent_id=2)
+ child_b = M.TroveCategory(fullname="ChildB", trove_cat_id=5, trove_parent_id=2)
+
+ session(M.TroveCategory).flush()
+
+ r = self.app.get('/categories/browse')
+ rendered_tree = r.html.find('div', {'id': 'content_base'}).find('div').find('div').find('ul')
+ expected = BeautifulSoup("""
+ <ul>
+ <li>Root</li>
+ <ul>
+ <li>CategoryA</li>
+ <ul>
+ <li>ChildA</li>
+ <li>ChildB</li>
+ </ul>
+ <li>CategoryB</li>
+ </ul>
+ </ul>
+ """.strip())
+ assert str(expected) == str(rendered_tree)
+
+ @td.with_tool('test2', 'admin_main', 'admin')
+ def test_trove_empty_hierarchy(self):
+ r = self.app.get('/categories/browse')
+ rendered_tree = r.html.find('div', {'id': 'content_base'}).find('div').find('div').find('ul')
+ expected = BeautifulSoup("""
+ <ul>
+ </ul>
+ """.strip())
+ assert str(expected) == str(rendered_tree)
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/allura/blob/d52f8e2a/tests/functional/test_user_profile.py
----------------------------------------------------------------------
diff --git a/tests/functional/test_user_profile.py b/tests/functional/test_user_profile.py
new file mode 100644
index 0000000..6f1a939
--- /dev/null
+++ b/tests/functional/test_user_profile.py
@@ -0,0 +1,180 @@
+from __future__ import print_function
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import unicode_literals
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import mock
+import tg
+from nose.tools import assert_equal, assert_in, assert_not_in
+
+from allura.model import Project, User
+from allura.tests import decorators as td
+from allura.tests import TestController
+
+
+class TestUserProfile(TestController):
+
+ @td.with_user_project('test-admin')
+ def test_profile(self):
+ r = self.app.get('/u/test-admin/profile/')
+ assert_equal('Test Admin',
+ r.html.find('h1', 'project_title').find('a').text)
+ sections = set([c for s in r.html.findAll(None, 'profile-section') for c in s['class'].split()])
+ assert_in('personal-data', sections)
+ assert_in('Username:test-admin', r.html.find(None, 'personal-data').getText())
+ assert_in('projects', sections)
+ assert_in('Test Project', r.html.find(None, 'projects').getText())
+ assert_in('Last Updated:less than 1 minute ago', r.html.find(None, 'projects').getText())
+ assert_in('tools', sections)
+ assert_in('Admin', r.html.find(None, 'tools').getText())
+ assert_in('skills', sections)
+ assert_in('No skills entered', r.html.find(None, 'skills').getText())
+
+ def test_wrong_profile(self):
+ self.app.get('/u/no-such-user/profile/', status=404)
+
+ @td.with_user_project('test-user')
+ def test_missing_user(self):
+ User.query.remove(dict(username='test-user'))
+ p = Project.query.get(shortname='u/test-user')
+ assert p is not None and p.is_user_project
+ response = self.app.get('/u/test-user/profile/', status=404)
+
+ @td.with_user_project('test-admin')
+ @td.with_wiki
+ def test_feed(self):
+ for ext in ['', '.rss', '.atom']:
+ r = self.app.get('/u/test-admin/profile/feed%s' % ext, status=200)
+ assert 'Recent posts by Test Admin' in r
+ assert 'Home modified by Test Admin' in r
+
+ @td.with_user_project('test-admin')
+ @td.with_user_project('test-user')
+ @mock.patch('allura.tasks.mail_tasks.sendsimplemail')
+ @mock.patch('allura.lib.helpers.gen_message_id')
+ @mock.patch('allura.model.User.can_send_user_message')
+ def test_send_message(self, check, gen_message_id, sendsimplemail):
+ check.return_value = True
+ gen_message_id.return_value = 'id'
+ test_user = User.by_username('test-user')
+ test_user.set_pref('email_address', 'test-user@example.com')
+ response = self.app.get(
+ '/u/test-user/profile/send_message', status=200)
+ assert '<b>From:</b> "Test Admin" <test-admin@users.localhost>' in response
+ self.app.post('/u/test-user/profile/send_user_message',
+ params={'subject': 'test subject',
+ 'message': 'test message',
+ 'cc': 'on'})
+
+ sendsimplemail.post.assert_called_once_with(
+ cc=User.by_username('test-admin').get_pref('email_address'),
+ text='test message\n\n---\n\nThis message was sent to you via the Allura web mail form. You may reply to this message directly, or send a message to Test Admin at http://localhost:8080/u/test-admin/profile/send_message\n',
+ toaddr=User.by_username('test-user').get_pref('email_address'),
+ fromaddr=User.by_username('test-admin').get_pref('email_address'),
+ reply_to=User.by_username('test-admin').get_pref('email_address'),
+ message_id='id',
+ subject='test subject')
+ sendsimplemail.reset_mock()
+ self.app.post('/u/test-user/profile/send_user_message',
+ params={'subject': 'test subject',
+ 'message': 'test message'})
+
+ sendsimplemail.post.assert_called_once_with(
+ cc=None,
+ text='test message\n\n---\n\nThis message was sent to you via the Allura web mail form. You may reply to this message directly, or send a message to Test Admin at http://localhost:8080/u/test-admin/profile/send_message\n',
+ toaddr=User.by_username('test-user').get_pref('email_address'),
+ fromaddr=User.by_username('test-admin').get_pref('email_address'),
+ reply_to=User.by_username('test-admin').get_pref('email_address'),
+ message_id='id',
+ subject='test subject')
+
+ check.return_value = False
+ response = self.app.get(
+ '/u/test-user/profile/send_message', status=200)
+ assert 'Sorry, messaging is rate-limited' in response
+
+ @td.with_user_project('test-user')
+ def test_send_message_for_anonymous(self):
+ r = self.app.get('/u/test-user/profile/send_message',
+ extra_environ={'username': '*anonymous'},
+ status=302)
+ assert 'You must be logged in to send user messages.' in self.webflash(
+ r)
+
+ r = self.app.post('/u/test-user/profile/send_user_message',
+ params={'subject': 'test subject',
+ 'message': 'test message',
+ 'cc': 'on'},
+ extra_environ={'username': '*anonymous'},
+ status=302)
+ assert 'You must be logged in to send user messages.' in self.webflash(
+ r)
+
+ @td.with_user_project('test-user')
+ def test_link_to_send_message_form(self):
+ User.by_username('test-admin').set_pref('email_address',
+ 'admin@example.com')
+ User.by_username('test-user').set_pref('email_address',
+ 'user@example.com')
+ r = self.app.get('/u/test-user/profile',
+ status=200)
+ assert r.html.find('a', dict(href='send_message'))
+
+ @td.with_user_project('test-user')
+ def test_disable_user_messages(self):
+ User.by_username('test-admin').set_pref('email_address',
+ 'admin@example.com')
+ test_user = User.by_username('test-user')
+ test_user.set_pref('email_address', 'user@example.com')
+ test_user.set_pref('disable_user_messages', True)
+ r = self.app.get('/u/test-user/profile')
+ assert '<a href="send_message">Send me a message</a>' not in r
+ r = self.app.get('/u/test-user/profile/send_message', status=302)
+ assert 'This user has disabled direct email messages' in self.webflash(
+ r)
+
+ @td.with_user_project('test-user')
+ def test_profile_sections(self):
+ project = Project.query.get(shortname='u/test-user')
+ app = project.app_instance('profile')
+
+ def ep(n):
+ m = mock.Mock()
+ m.name = n
+ m.load()().display.return_value = 'Section %s' % n
+ return m
+ eps = list(map(ep, ['a', 'b', 'c', 'd']))
+ order = {'user_profile_sections.order': 'b, d,c , f '}
+ if hasattr(type(app), '_sections'):
+ delattr(type(app), '_sections')
+ with mock.patch('allura.lib.helpers.iter_entry_points') as iep:
+ with mock.patch.dict(tg.config, order):
+ iep.return_value = eps
+ sections = app.profile_sections
+ assert_equal(sections, [
+ eps[1].load(),
+ eps[3].load(),
+ eps[2].load(),
+ eps[0].load()])
+ r = self.app.get('/u/test-user/profile')
+ assert_in('Section a', r.body)
+ assert_in('Section b', r.body)
+ assert_in('Section c', r.body)
+ assert_in('Section d', r.body)
+ assert_not_in('Section f', r.body)
http://git-wip-us.apache.org/repos/asf/allura/blob/d52f8e2a/tests/model/__init__.py
----------------------------------------------------------------------
diff --git a/tests/model/__init__.py b/tests/model/__init__.py
new file mode 100644
index 0000000..39c107f
--- /dev/null
+++ b/tests/model/__init__.py
@@ -0,0 +1,24 @@
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""Model test suite for the models of the application."""
+from __future__ import print_function
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import unicode_literals
http://git-wip-us.apache.org/repos/asf/allura/blob/d52f8e2a/tests/model/test_artifact.py
----------------------------------------------------------------------
diff --git a/tests/model/test_artifact.py b/tests/model/test_artifact.py
new file mode 100644
index 0000000..fb6ee43
--- /dev/null
+++ b/tests/model/test_artifact.py
@@ -0,0 +1,229 @@
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Model tests for artifact
+"""
+from __future__ import print_function
+from __future__ import division
+from __future__ import absolute_import
+from __future__ import unicode_literals
+import re
+from datetime import datetime
+
+from pylons import tmpl_context as c
+from nose.tools import assert_raises, assert_equal
+from nose import with_setup
+from mock import patch
+from ming.orm.ormsession import ThreadLocalORMSession
+from ming.orm import Mapper
+from bson import ObjectId
+from webob import Request
+
+import allura
+from allura import model as M
+from allura.lib import helpers as h
+from allura.lib import security
+from allura.tests import decorators as td
+from allura.websetup.schema import REGISTRY
+from alluratest.controller import setup_basic_test, setup_unit_test
+from forgewiki import model as WM
+
+
+class Checkmessage(M.Message):
+
+ class __mongometa__:
+ name = 'checkmessage'
+
+ def url(self):
+ return ''
+
+ def __init__(self, **kw):
+ super(Checkmessage, self).__init__(**kw)
+ if self.slug is not None and self.full_slug is None:
+ self.full_slug = datetime.utcnow().strftime(
+ '%Y%m%d%H%M%S') + ':' + self.slug
+Mapper.compile_all()
+
+
+def setUp():
+ setup_basic_test()
+ setup_unit_test()
+ setup_with_tools()
+
+
+@td.with_wiki
+def setup_with_tools():
+ h.set_context('test', 'wiki', neighborhood='Projects')
+ Checkmessage.query.remove({})
+ WM.Page.query.remove({})
+ WM.PageHistory.query.remove({})
+ M.Shortlink.query.remove({})
+ c.user = M.User.query.get(username='test-admin')
+ Checkmessage.project = c.project
+ Checkmessage.app_config = c.app.config
+
+
+def tearDown():
+ ThreadLocalORMSession.close_all()
+
+
+@with_setup(setUp, tearDown)
+def test_artifact():
+ pg = WM.Page(title='TestPage1')
+ assert pg.project == c.project
+ assert pg.project_id == c.project._id
+ assert pg.app.config == c.app.config
+ assert pg.app_config == c.app.config
+ u = M.User.query.get(username='test-user')
+ pr = M.ProjectRole.by_user(u, upsert=True)
+ ThreadLocalORMSession.flush_all()
+ REGISTRY.register(allura.credentials, allura.lib.security.Credentials())
+ assert not security.has_access(pg, 'delete')(user=u)
+ pg.acl.append(M.ACE.allow(pr._id, 'delete'))
+ ThreadLocalORMSession.flush_all()
+ assert security.has_access(pg, 'delete')(user=u)
+ pg.acl.pop()
+ ThreadLocalORMSession.flush_all()
+ assert not security.has_access(pg, 'delete')(user=u)
+
+
+def test_artifact_index():
+ pg = WM.Page(title='TestPage1')
+ idx = pg.index()
+ assert 'title' in idx
+ assert 'url_s' in idx
+ assert 'project_id_s' in idx
+ assert 'mount_point_s' in idx
+ assert 'type_s' in idx
+ assert 'id' in idx
+ assert idx['id'] == pg.index_id()
+ assert 'text' in idx
+ assert 'TestPage' in pg.shorthand_id()
+ assert pg.link_text() == pg.shorthand_id()
+
+
+@with_setup(setUp, tearDown)
+def test_artifactlink():
+ pg = WM.Page(title='TestPage2')
+ q = M.Shortlink.query.find(dict(
+ project_id=c.project._id,
+ app_config_id=c.app.config._id,
+ link=pg.shorthand_id()))
+ assert q.count() == 0
+ ThreadLocalORMSession.flush_all()
+ M.MonQTask.run_ready()
+ ThreadLocalORMSession.flush_all()
+ assert q.count() == 1
+ assert M.Shortlink.lookup('[TestPage2]')
+ assert M.Shortlink.lookup('[wiki:TestPage2]')
+ assert M.Shortlink.lookup('[test:wiki:TestPage2]')
+ assert not M.Shortlink.lookup('[test:wiki:TestPage2:foo]')
+ assert not M.Shortlink.lookup('[Wiki:TestPage2]')
+ assert not M.Shortlink.lookup('[TestPage2_no_such_page]')
+ c.project.uninstall_app('wiki')
+ ThreadLocalORMSession.flush_all()
+ assert not M.Shortlink.lookup('[wiki:TestPage2]')
+ pg.delete()
+ ThreadLocalORMSession.flush_all()
+ M.MonQTask.run_ready()
+ ThreadLocalORMSession.flush_all()
+ assert q.count() == 0
+
+
+@with_setup(setUp, tearDown)
+def test_gen_messageid():
+ assert re.match(r'[0-9a-zA-Z]*.wiki@test.p.localhost',
+ h.gen_message_id())
+
+
+@with_setup(setUp, tearDown)
+def test_gen_messageid_with_id_set():
+ oid = ObjectId()
+ assert re.match(r'%s.wiki@test.p.localhost' %
+ str(oid), h.gen_message_id(oid))
+
+
+@with_setup(setUp, tearDown)
+def test_artifact_messageid():
+ p = WM.Page(title='T')
+ assert re.match(r'%s.wiki@test.p.localhost' %
+ str(p._id), p.message_id())
+
+
+@with_setup(setUp, tearDown)
+def test_versioning():
+ pg = WM.Page(title='TestPage3')
+ with patch('allura.model.artifact.request',
+ Request.blank('/', remote_addr='1.1.1.1')):
+ pg.commit()
+ ThreadLocalORMSession.flush_all()
+ pg.text = 'Here is some text'
+ pg.commit()
+ ThreadLocalORMSession.flush_all()
+ ss = pg.get_version(1)
+ assert ss.author.logged_ip == '1.1.1.1'
+ assert ss.index()['is_history_b']
+ assert ss.shorthand_id() == pg.shorthand_id() + '#1'
+ assert ss.title == pg.title
+ assert ss.text != pg.text
+ ss = pg.get_version(-1)
+ assert ss.index()['is_history_b']
+ assert ss.shorthand_id() == pg.shorthand_id() + '#2'
+ assert ss.title == pg.title
+ assert ss.text == pg.text
+ assert_raises(IndexError, pg.get_version, 42)
+ pg.revert(1)
+ pg.commit()
+ ThreadLocalORMSession.flush_all()
+ assert ss.text != pg.text
+ assert pg.history().count() == 3
+
+
+@with_setup(setUp, tearDown)
+def test_messages_unknown_lookup():
+ from bson import ObjectId
+ m = Checkmessage()
+ m.author_id = ObjectId() # something new
+ assert type(m.author()) == M.User, type(m.author())
+ assert m.author() == M.User.anonymous()
+
+
+@with_setup(setUp, tearDown)
+@patch('allura.model.artifact.datetime')
+def test_last_updated(_datetime):
+ c.project.last_updated = datetime(2014, 1, 1)
+ _datetime.utcnow.return_value = datetime(2014, 1, 2)
+ WM.Page(title='TestPage1')
+ ThreadLocalORMSession.flush_all()
+ assert_equal(c.project.last_updated, datetime(2014, 1, 2))
+
+
+@with_setup(setUp, tearDown)
+@patch('allura.model.artifact.datetime')
+def test_last_updated_disabled(_datetime):
+ c.project.last_updated = datetime(2014, 1, 1)
+ _datetime.utcnow.return_value = datetime(2014, 1, 2)
+ try:
+ M.artifact_orm_session._get().skip_last_updated = True
+ WM.Page(title='TestPage1')
+ ThreadLocalORMSession.flush_all()
+ assert_equal(c.project.last_updated, datetime(2014, 1, 1))
+ finally:
+ M.artifact_orm_session._get().skip_last_updated = False