You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wi...@apache.org on 2014/06/10 01:54:14 UTC
git commit: Implement a TRequestsClient as a prelude to kerberization.
Repository: incubator-aurora
Updated Branches:
refs/heads/master 0cdeba5f2 -> 5665d4ce8
Implement a TRequestsClient as a prelude to kerberization.
This is the first bit of AURORA-515. To add kerberos support, we just need
to add a dependency on requests_kerberos and inject KerberosAuth() as the
'auth=' parameter to TRequestsClient with the proper service principal
specified.
Testing Done:
Updated client/api tests for the new transport.
Reviewed at https://reviews.apache.org/r/22280/
Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/5665d4ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/5665d4ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/5665d4ce
Branch: refs/heads/master
Commit: 5665d4ce80a42ac9a971a6f641de5109acabc967
Parents: 0cdeba5
Author: Brian Wickman <wi...@apache.org>
Authored: Mon Jun 9 16:54:06 2014 -0700
Committer: Brian Wickman <wi...@apache.org>
Committed: Mon Jun 9 16:54:06 2014 -0700
----------------------------------------------------------------------
3rdparty/python/BUILD | 2 +-
src/main/python/apache/aurora/client/api/BUILD | 1 +
.../aurora/client/api/scheduler_client.py | 57 +++---
src/main/python/apache/aurora/common/BUILD | 9 +
.../python/apache/aurora/common/transport.py | 110 ++++++++++++
.../aurora/client/api/test_scheduler_client.py | 177 +++++++------------
src/test/python/apache/aurora/common/BUILD | 11 ++
.../apache/aurora/common/test_transport.py | 84 +++++++++
8 files changed, 310 insertions(+), 141 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5665d4ce/3rdparty/python/BUILD
----------------------------------------------------------------------
diff --git a/3rdparty/python/BUILD b/3rdparty/python/BUILD
index f2e94d2..c9f63cf 100644
--- a/3rdparty/python/BUILD
+++ b/3rdparty/python/BUILD
@@ -31,7 +31,7 @@ python_requirement('mox==0.5.3')
python_requirement('psutil==1.1.2')
python_requirement('pystachio==0.7.2')
python_requirement('pyyaml==3.10')
-python_requirement('requests==2.0.0')
+python_requirement('requests==2.3.0')
python_requirement('thrift==0.9.1')
python_requirement('twitter.common.app==%s' % COMMONS_VERSION)
python_requirement('twitter.common.collections==%s' % COMMONS_VERSION)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5665d4ce/src/main/python/apache/aurora/client/api/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/BUILD b/src/main/python/apache/aurora/client/api/BUILD
index 6968d62..c205a7d 100644
--- a/src/main/python/apache/aurora/client/api/BUILD
+++ b/src/main/python/apache/aurora/client/api/BUILD
@@ -82,6 +82,7 @@ python_library(
pants('3rdparty/python:twitter.common.zookeeper'),
pants('src/main/python/apache/aurora/common/auth'),
pants('src/main/python/apache/aurora/common:cluster'),
+ pants('src/main/python/apache/aurora/common:transport'),
pants('src/main/thrift/org/apache/aurora/gen:py-thrift'),
]
)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5665d4ce/src/main/python/apache/aurora/client/api/scheduler_client.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/scheduler_client.py b/src/main/python/apache/aurora/client/api/scheduler_client.py
index 7c5fba6..10a956c 100644
--- a/src/main/python/apache/aurora/client/api/scheduler_client.py
+++ b/src/main/python/apache/aurora/client/api/scheduler_client.py
@@ -27,10 +27,16 @@ from twitter.common.zookeeper.serverset import ServerSet
from apache.aurora.common.auth import make_session_key, SessionKeyError
from apache.aurora.common.cluster import Cluster
+from apache.aurora.common.transport import TRequestsTransport
from gen.apache.aurora.api import AuroraAdmin
from gen.apache.aurora.api.constants import CURRENT_API_VERSION
+try:
+ from urlparse import urljoin
+except ImportError:
+ from urllib.parse import urljoin
+
class SchedulerClientTrait(Cluster.Trait):
zk = String # noqa
@@ -59,12 +65,7 @@ class SchedulerClient(object):
if cluster.zk:
return ZookeeperSchedulerClient(cluster, port=cluster.zk_port, **kwargs)
elif cluster.scheduler_uri:
- try:
- host, port = cluster.scheduler_uri.split(':', 2)
- port = int(port)
- except ValueError:
- raise ValueError('Malformed Cluster scheduler_uri: %s' % cluster.scheduler_uri)
- return DirectSchedulerClient(host, port)
+ return DirectSchedulerClient(cluster.scheduler_uri)
else:
raise ValueError('"cluster" does not specify zk or scheduler_uri')
@@ -83,8 +84,8 @@ class SchedulerClient(object):
return None
@classmethod
- def _connect_scheduler(cls, host, port, clock=time):
- transport = THttpClient.THttpClient('http://%s:%s/api' % (host, port))
+ def _connect_scheduler(cls, uri, clock=time):
+ transport = TRequestsTransport(uri)
protocol = TJSONProtocol.TJSONProtocol(transport)
schedulerClient = AuroraAdmin.Client(protocol)
for _ in range(cls.THRIFT_RETRIES):
@@ -98,7 +99,7 @@ class SchedulerClient(object):
# Monkey-patched proxies, like socks, can generate a proxy error here.
# without adding a dependency, we can't catch those in a more specific way.
raise cls.CouldNotConnect('Connection to scheduler failed: %s' % e)
- raise cls.CouldNotConnect('Could not connect to %s:%s' % (host, port))
+ raise cls.CouldNotConnect('Could not connect to %s' % uri)
class ZookeeperSchedulerClient(SchedulerClient):
@@ -118,9 +119,11 @@ class ZookeeperSchedulerClient(SchedulerClient):
SchedulerClient.__init__(self, verbose=verbose)
self._cluster = cluster
self._zkport = port
- self._http = None
+ self._endpoint = None
+ self._uri = None
- def _connect(self):
+ def _resolve(self):
+ """Resolve the uri associated with this scheduler from zookeeper."""
joined = threading.Event()
def on_join(elements):
joined.set()
@@ -131,34 +134,42 @@ class ZookeeperSchedulerClient(SchedulerClient):
if len(serverset_endpoints) == 0:
raise self.CouldNotConnect('No schedulers detected in %s!' % self._cluster.name)
instance = serverset_endpoints[0]
- self._http = instance.additional_endpoints.get('http')
+ if 'https' in instance.additional_endpoints:
+ endpoint = instance.additional_endpoints['https']
+ self._uri = 'https://%s:%s' % (endpoint.host, endpoint.port)
+ elif 'http' in instance.additional_endpoints:
+ endpoint = instance.additional_endpoints['http']
+ self._uri = 'http://%s:%s' % (endpoint.host, endpoint.port)
zk.stop()
- return self._connect_scheduler(self._http.host, self._http.port)
+
+ def _connect(self):
+ if self._uri is None:
+ self._resolve()
+ if self._uri is not None:
+ return self._connect_scheduler(urljoin(self._uri, 'api'))
@property
def url(self):
proxy_url = self._cluster.proxy_url
if proxy_url:
return proxy_url
- if self._http is None:
- self._connect()
- if self._http:
- return 'http://%s:%s' % (self._http.host, self._http.port)
+ if self._uri is None:
+ self._resolve()
+ if self._uri:
+ return self._uri
class DirectSchedulerClient(SchedulerClient):
- def __init__(self, host, port):
+ def __init__(self, uri):
SchedulerClient.__init__(self, verbose=True)
- self._host = host
- self._port = port
+ self._uri = uri
def _connect(self):
- return self._connect_scheduler(self._host, self._port)
+ return self._connect_scheduler(urljoin(self._uri, 'api'))
@property
def url(self):
- # TODO(wickman) This is broken -- make this tunable in MESOS-3005
- return 'http://%s:8081' % self._host
+ return self._uri
class SchedulerProxy(object):
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5665d4ce/src/main/python/apache/aurora/common/BUILD
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/common/BUILD b/src/main/python/apache/aurora/common/BUILD
index 0de0cf7..b2ff1ff 100644
--- a/src/main/python/apache/aurora/common/BUILD
+++ b/src/main/python/apache/aurora/common/BUILD
@@ -67,6 +67,15 @@ python_library(
)
python_library(
+ name = 'transport',
+ sources = ['transport.py'],
+ dependencies = [
+ pants('3rdparty/python:requests'),
+ pants('3rdparty/python:thrift'),
+ ],
+)
+
+python_library(
name = 'common',
dependencies = [
pants(':aurora_job_key'),
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5665d4ce/src/main/python/apache/aurora/common/transport.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/common/transport.py b/src/main/python/apache/aurora/common/transport.py
new file mode 100644
index 0000000..6f7c355
--- /dev/null
+++ b/src/main/python/apache/aurora/common/transport.py
@@ -0,0 +1,110 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from io import BytesIO
+
+import requests
+from requests import exceptions as request_exceptions
+from thrift.transport.TTransport import TTransportBase, TTransportException
+
+try:
+ from urlparse import urlparse
+except ImportError:
+ from urllib.parse import urlparse
+
+
+def default_requests_session_factory():
+ session = requests.session()
+ session.headers['User-Agent'] = 'Python TRequestsTransport v1.0'
+ return session
+
+
+class TRequestsTransport(TTransportBase):
+ """A Thrift HTTP client based upon the requests module."""
+
+ def __init__(self, uri, auth=None, session_factory=default_requests_session_factory):
+ """Construct a TRequestsTransport.
+
+ Construct a Thrift transport based upon the requests module. URI is the
+ HTTP endpoint that the server should be listening on. The 'auth'
+ keyword is passed directly to the requests client and can be used to
+ provide different authentication contexts such as Kerberos
+ authentication via the requests-kerberos module.
+
+ :param uri: The endpoint uri
+ :type uri: str
+ :keyword auth: The requests authentication context.
+ """
+ self.__session = None
+ self.__session_factory = session_factory
+ if not callable(session_factory):
+ raise TypeError('session_factory should be a callable that produces a requests.Session!')
+ self.__wbuf = BytesIO()
+ self.__rbuf = BytesIO()
+ self.__uri = uri
+ try:
+ self.__urlparse = urlparse(uri)
+ except ValueError:
+ raise TTransportException('Failed to parse uri %r' % (uri,))
+ self.__timeout = None
+ self.__auth = auth
+
+ def isOpen(self):
+ return self.__session is not None
+
+ def open(self):
+ self.__session = self.__session_factory()
+
+ def close(self):
+ session, self.__session = self.__session, None
+ session.close()
+
+ def setTimeout(self, ms):
+ self.__timeout = ms / 1000.0
+
+ def read(self, size):
+ return self.__rbuf.read(size)
+
+ def write(self, buf):
+ self.__wbuf.write(buf)
+
+ def flush(self):
+ if self.isOpen():
+ self.close()
+
+ self.open()
+
+ data = self.__wbuf.getvalue()
+ self.__wbuf = BytesIO()
+
+ self.__session.headers['Content-Type'] = 'application/x-thrift'
+ self.__session.headers['Content-Length'] = str(len(data))
+ self.__session.headers['Host'] = self.__urlparse.hostname
+
+ try:
+ response = self.__session.post(
+ self.__uri,
+ data=data,
+ timeout=self.__timeout,
+ auth=self.__auth)
+ except request_exceptions.Timeout:
+ raise TTransportException(
+ type=TTransportException.TIMED_OUT,
+ message='Timed out talking to %s' % self.__uri)
+ except request_exceptions.RequestException as e:
+ raise TTransportException(
+ type=TTransportException.UNKNOWN,
+ message='Unknown error talking to %s: %s' % (self.__uri, e))
+
+ self.__rbuf = BytesIO(response.content)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5665d4ce/src/test/python/apache/aurora/client/api/test_scheduler_client.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/api/test_scheduler_client.py b/src/test/python/apache/aurora/client/api/test_scheduler_client.py
index 6b23a4a..dd16fe2 100644
--- a/src/test/python/apache/aurora/client/api/test_scheduler_client.py
+++ b/src/test/python/apache/aurora/client/api/test_scheduler_client.py
@@ -17,6 +17,7 @@ import time
import unittest
import mock
+import pytest
from mox import IgnoreArg, IsA, Mox
from thrift.transport import THttpClient, TTransport
from twitter.common.quantity import Amount, Time
@@ -25,6 +26,7 @@ from twitter.common.zookeeper.serverset.endpoint import ServiceInstance
import apache.aurora.client.api.scheduler_client as scheduler_client
from apache.aurora.common.cluster import Cluster
+from apache.aurora.common.transport import TRequestsTransport
import gen.apache.aurora.api.AuroraAdmin as AuroraAdmin
import gen.apache.aurora.api.AuroraSchedulerManager as AuroraSchedulerManager
@@ -98,23 +100,17 @@ class TestSchedulerProxyInjection(unittest.TestCase):
def test_startCronJob(self):
self.mock_thrift_client.startCronJob(IsA(JobKey), IsA(SessionKey))
-
self.mox.ReplayAll()
-
self.make_scheduler_proxy().startCronJob(JOB_KEY)
def test_createJob(self):
self.mock_thrift_client.createJob(IsA(JobConfiguration), IsA(SessionKey))
-
self.mox.ReplayAll()
-
self.make_scheduler_proxy().createJob(JobConfiguration())
def test_replaceCronTemplate(self):
self.mock_thrift_client.replaceCronTemplate(IsA(JobConfiguration), IsA(Lock), IsA(SessionKey))
-
self.mox.ReplayAll()
-
self.make_scheduler_proxy().replaceCronTemplate(JobConfiguration(), Lock())
def test_scheduleCronJob(self):
@@ -129,240 +125,187 @@ class TestSchedulerProxyInjection(unittest.TestCase):
def test_populateJobConfig(self):
self.mock_thrift_client.populateJobConfig(IsA(JobConfiguration))
-
self.mox.ReplayAll()
-
self.make_scheduler_proxy().populateJobConfig(JobConfiguration())
def test_restartShards(self):
self.mock_thrift_client.restartShards(IsA(JobKey), IgnoreArg(), IsA(SessionKey))
-
self.mox.ReplayAll()
-
self.make_scheduler_proxy().restartShards(JOB_KEY, set([0]))
def test_getTasksStatus(self):
self.mock_thrift_client.getTasksStatus(IsA(TaskQuery))
-
self.mox.ReplayAll()
-
self.make_scheduler_proxy().getTasksStatus(TaskQuery())
def test_getJobs(self):
self.mock_thrift_client.getJobs(IgnoreArg())
-
self.mox.ReplayAll()
-
self.make_scheduler_proxy().getJobs(ROLE)
def test_killTasks(self):
self.mock_thrift_client.killTasks(IsA(TaskQuery), IsA(SessionKey))
-
self.mox.ReplayAll()
-
self.make_scheduler_proxy().killTasks(TaskQuery())
def test_getQuota(self):
self.mock_thrift_client.getQuota(IgnoreArg())
-
self.mox.ReplayAll()
-
self.make_scheduler_proxy().getQuota(ROLE)
def test_startMaintenance(self):
self.mock_thrift_client.startMaintenance(IsA(Hosts), IsA(SessionKey))
-
self.mox.ReplayAll()
-
self.make_scheduler_proxy().startMaintenance(Hosts())
def test_drainHosts(self):
self.mock_thrift_client.drainHosts(IsA(Hosts), IsA(SessionKey))
-
self.mox.ReplayAll()
-
self.make_scheduler_proxy().drainHosts(Hosts())
def test_maintenanceStatus(self):
self.mock_thrift_client.maintenanceStatus(IsA(Hosts), IsA(SessionKey))
-
self.mox.ReplayAll()
-
self.make_scheduler_proxy().maintenanceStatus(Hosts())
def test_endMaintenance(self):
self.mock_thrift_client.endMaintenance(IsA(Hosts), IsA(SessionKey))
-
self.mox.ReplayAll()
-
self.make_scheduler_proxy().endMaintenance(Hosts())
def test_getVersion(self):
self.mock_thrift_client.getVersion()
-
self.mox.ReplayAll()
-
self.make_scheduler_proxy().getVersion()
def test_addInstances(self):
self.mock_thrift_client.addInstances(IsA(JobKey), IgnoreArg(), IsA(Lock), IsA(SessionKey))
-
self.mox.ReplayAll()
-
self.make_scheduler_proxy().addInstances(JobKey(), {}, Lock())
def test_acquireLock(self):
self.mock_thrift_client.acquireLock(IsA(Lock), IsA(SessionKey))
-
self.mox.ReplayAll()
-
self.make_scheduler_proxy().acquireLock(Lock())
def test_releaseLock(self):
self.mock_thrift_client.releaseLock(IsA(Lock), IsA(LockValidation), IsA(SessionKey))
-
self.mox.ReplayAll()
-
self.make_scheduler_proxy().releaseLock(Lock(), LockValidation())
class TestSchedulerProxyAdminInjection(TestSchedulerProxyInjection):
def test_setQuota(self):
self.mock_thrift_client.setQuota(IgnoreArg(), IsA(ResourceAggregate), IsA(SessionKey))
-
self.mox.ReplayAll()
-
self.make_scheduler_proxy().setQuota(ROLE, ResourceAggregate())
def test_forceTaskState(self):
self.mock_thrift_client.forceTaskState(IgnoreArg(), IgnoreArg(), IsA(SessionKey))
-
self.mox.ReplayAll()
-
self.make_scheduler_proxy().forceTaskState('taskid', ScheduleStatus.LOST)
def test_performBackup(self):
self.mock_thrift_client.performBackup(IsA(SessionKey))
-
self.mox.ReplayAll()
-
self.make_scheduler_proxy().performBackup()
def test_listBackups(self):
self.mock_thrift_client.listBackups(IsA(SessionKey))
-
self.mox.ReplayAll()
-
self.make_scheduler_proxy().listBackups()
def test_stageRecovery(self):
self.mock_thrift_client.stageRecovery(IsA(TaskQuery), IsA(SessionKey))
-
self.mox.ReplayAll()
-
self.make_scheduler_proxy().stageRecovery(TaskQuery())
def test_queryRecovery(self):
self.mock_thrift_client.queryRecovery(IsA(TaskQuery), IsA(SessionKey))
-
self.mox.ReplayAll()
-
self.make_scheduler_proxy().queryRecovery(TaskQuery())
def test_deleteRecoveryTasks(self):
self.mock_thrift_client.deleteRecoveryTasks(IsA(TaskQuery), IsA(SessionKey))
-
self.mox.ReplayAll()
-
self.make_scheduler_proxy().deleteRecoveryTasks(TaskQuery())
def test_commitRecovery(self):
self.mock_thrift_client.commitRecovery(IsA(SessionKey))
-
self.mox.ReplayAll()
-
self.make_scheduler_proxy().commitRecovery()
def test_unloadRecovery(self):
self.mock_thrift_client.unloadRecovery(IsA(SessionKey))
-
self.mox.ReplayAll()
-
self.make_scheduler_proxy().unloadRecovery()
def test_snapshot(self):
self.mock_thrift_client.snapshot(IsA(SessionKey))
-
self.mox.ReplayAll()
-
self.make_scheduler_proxy().snapshot()
def test_rewriteConfigs(self):
self.mock_thrift_client.rewriteConfigs(IsA(RewriteConfigsRequest), IsA(SessionKey))
-
self.mox.ReplayAll()
-
self.make_scheduler_proxy().rewriteConfigs(RewriteConfigsRequest())
-class TestZookeeperSchedulerClient(unittest.TestCase):
- def setUp(self):
- self.mox = Mox()
-
- def tearDown(self):
- self.mox.UnsetStubs()
- self.mox.VerifyAll()
-
- def test_url_when_not_connected_and_cluster_has_no_proxy_url(self):
- host = 'some-host.example.com'
- port = 31181
-
- mock_zk = self.mox.CreateMock(TwitterKazooClient)
-
- def mock_get_serverset(*args, **kwargs):
- service_json = '''{
- "additionalEndpoints": {
- "http": {
- "host": "%s",
- "port": %d
- }
- },
- "serviceEndpoint": {
- "host": "%s",
- "port": %d
- },
- "shard": 0,
- "status": "ALIVE"
- }''' % (host, port, host, port)
-
- return mock_zk, [ServiceInstance.unpack(service_json)]
-
- class ZookeeperSchedulerClientTestImpl(scheduler_client.ZookeeperSchedulerClient):
- SERVERSET_TIMEOUT = Amount(10, Time.MILLISECONDS)
-
- original_method = ZookeeperSchedulerClientTestImpl.get_scheduler_serverset
-
- try:
- ZookeeperSchedulerClientTestImpl.get_scheduler_serverset = mock_get_serverset
-
- zk_scheduler_client = ZookeeperSchedulerClientTestImpl(Cluster(proxy_url=None))
- self.mox.StubOutWithMock(zk_scheduler_client, '_connect_scheduler')
- mock_zk.stop()
- zk_scheduler_client._connect_scheduler(host, port)
-
- self.mox.ReplayAll()
-
- assert zk_scheduler_client.url == 'http://%s:%d' % (host, port)
- finally:
- ZookeeperSchedulerClientTestImpl.get_scheduler_serverset = original_method
-
-
-class TestSchedulerClient(unittest.TestCase):
- @mock.patch('thrift.transport.THttpClient.THttpClient', spec=THttpClient.THttpClient)
- def test_connect_scheduler(self, MockTHttpClient):
- MockTHttpClient.return_value.open.side_effect = [TTransport.TTransportException, True]
- mock_time = mock.Mock(spec=time)
- scheduler_client.SchedulerClient._connect_scheduler('scheduler.example.com', 1337, mock_time)
- assert MockTHttpClient.return_value.open.call_count is 2
- mock_time.sleep.assert_called_once_with(
- scheduler_client.SchedulerClient.RETRY_TIMEOUT.as_(Time.SECONDS))
+@pytest.mark.parametrize('scheme', ('http', 'https'))
+def test_url_when_not_connected_and_cluster_has_no_proxy_url(scheme):
+ host = 'some-host.example.com'
+ port = 31181
+
+ mock_zk = mock.MagicMock(spec=TwitterKazooClient)
+
+ service_json = '''{
+ "additionalEndpoints": {
+ "%(scheme)s": {
+ "host": "%(host)s",
+ "port": %(port)d
+ }
+ },
+ "serviceEndpoint": {
+ "host": "%(host)s",
+ "port": %(port)d
+ },
+ "shard": 0,
+ "status": "ALIVE"
+ }''' % dict(host=host, port=port, scheme=scheme)
+
+ service_endpoints = [ServiceInstance.unpack(service_json)]
+
+ def make_mock_client(proxy_url):
+ client = scheduler_client.ZookeeperSchedulerClient(Cluster(proxy_url=proxy_url))
+ client.get_scheduler_serverset = mock.MagicMock(return_value=(mock_zk, service_endpoints))
+ client.SERVERSET_TIMEOUT = Amount(0, Time.SECONDS)
+ client._connect_scheduler = mock.MagicMock()
+ return client
+
+ client = make_mock_client(proxy_url=None)
+ assert client.url == '%s://%s:%d' % (scheme, host, port)
+ client._connect_scheduler.assert_has_calls([])
+
+ client = make_mock_client(proxy_url='https://scheduler.proxy')
+ assert client.url == 'https://scheduler.proxy'
+ client._connect_scheduler.assert_has_calls([])
+
+ client = make_mock_client(proxy_url=None)
+ client.get_thrift_client()
+ assert client.url == '%s://%s:%d' % (scheme, host, port)
+ client._connect_scheduler.assert_has_calls([mock.call('%s://%s:%d/api' % (scheme, host, port))])
+ client._connect_scheduler.reset_mock()
+ client.get_thrift_client()
+ client._connect_scheduler.assert_has_calls([])
+
+
+@mock.patch('apache.aurora.client.api.scheduler_client.TRequestsTransport', spec=TRequestsTransport)
+def test_connect_scheduler(mock_client):
+ mock_client.return_value.open.side_effect = [TTransport.TTransportException, True]
+ mock_time = mock.Mock(spec=time)
+ scheduler_client.SchedulerClient._connect_scheduler(
+ 'https://scheduler.example.com:1337',
+ mock_time)
+ assert mock_client.return_value.open.call_count == 2
+ mock_time.sleep.assert_called_once_with(
+ scheduler_client.SchedulerClient.RETRY_TIMEOUT.as_(Time.SECONDS))
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5665d4ce/src/test/python/apache/aurora/common/BUILD
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/common/BUILD b/src/test/python/apache/aurora/common/BUILD
index 4fc7788..e949ba8 100644
--- a/src/test/python/apache/aurora/common/BUILD
+++ b/src/test/python/apache/aurora/common/BUILD
@@ -21,6 +21,7 @@ python_test_suite(
pants(':test_cluster_option'),
pants(':test_http_signaler'),
pants(':test_shellify'),
+ pants(':test_transport'),
]
)
@@ -76,3 +77,13 @@ python_tests(
pants('src/main/python/apache/aurora/common:shellify'),
]
)
+
+python_tests(
+ name = 'test_transport',
+ sources = ['test_transport.py'],
+ dependencies = [
+ pants('src/main/thrift/org/apache/aurora/gen:py-thrift'),
+ pants('src/main/python/apache/aurora/common:transport'),
+ pants('3rdparty/python:mock'),
+ ]
+)
http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/5665d4ce/src/test/python/apache/aurora/common/test_transport.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/common/test_transport.py b/src/test/python/apache/aurora/common/test_transport.py
new file mode 100644
index 0000000..c9a4141
--- /dev/null
+++ b/src/test/python/apache/aurora/common/test_transport.py
@@ -0,0 +1,84 @@
+from threading import Thread
+
+import mock
+import requests
+from requests import exceptions as request_exceptions
+from thrift.protocol import TJSONProtocol
+from thrift.server import THttpServer
+from thrift.transport import TTransport
+
+from apache.aurora.common.transport import TRequestsTransport
+
+from gen.apache.aurora.api import ReadOnlyScheduler
+from gen.apache.aurora.api.ttypes import Response, ResponseCode, ServerInfo
+
+
+class ReadOnlySchedulerHandler(object):
+ def getRoleSummary(self): # noqa
+ server_info = ServerInfo(clusterName='west', thriftAPIVersion=3)
+ return Response(responseCode=ResponseCode.OK, serverInfo=server_info)
+
+
+def test_request_transport_integration():
+ handler = ReadOnlySchedulerHandler()
+ processor = ReadOnlyScheduler.Processor(handler)
+ pfactory = TJSONProtocol.TJSONProtocolFactory()
+ server = THttpServer.THttpServer(processor, ('localhost', 0), pfactory)
+ server_thread = Thread(target=server.serve)
+ server_thread.start()
+ _, server_port = server.httpd.socket.getsockname()
+
+ response = None
+
+ try:
+ transport = TRequestsTransport('http://localhost:%d' % server_port)
+ protocol = TJSONProtocol.TJSONProtocol(transport)
+ client = ReadOnlyScheduler.Client(protocol)
+ response = client.getRoleSummary()
+ finally:
+ server.httpd.shutdown()
+
+ assert response is not None
+ assert response.responseCode == ResponseCode.OK
+ assert response.serverInfo.clusterName == 'west'
+ assert response.serverInfo.thriftAPIVersion == 3
+
+ transport.close()
+
+
+def test_request_transport_timeout():
+ session = mock.MagicMock(spec=requests.Session)
+ session.headers = {}
+ session.post = mock.Mock(side_effect=request_exceptions.Timeout())
+ transport = TRequestsTransport('http://localhost:12345', session_factory=lambda: session)
+ protocol = TJSONProtocol.TJSONProtocol(transport)
+ client = ReadOnlyScheduler.Client(protocol)
+
+ try:
+ client.getRoleSummary()
+ assert False, 'getRoleSummary should not succeed'
+ except TTransport.TTransportException as e:
+ assert e.message == 'Timed out talking to http://localhost:12345'
+ except Exception as e:
+ assert False, 'Only expected TTransportException, got %s' % e
+
+ transport.close()
+
+
+def test_request_any_other_exception():
+ session = mock.MagicMock(spec=requests.Session)
+ session.headers = {}
+ session.post = mock.Mock(side_effect=request_exceptions.ConnectionError())
+ transport = TRequestsTransport('http://localhost:12345', session_factory=lambda: session)
+ protocol = TJSONProtocol.TJSONProtocol(transport)
+ client = ReadOnlyScheduler.Client(protocol)
+
+ try:
+ client.getRoleSummary()
+ assert False, 'getRoleSummary should not succeed'
+ except TTransport.TTransportException:
+ pass
+ except Exception as e:
+ assert False, 'Only expected TTransportException, got %s' % e
+
+ transport.close()