You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@submarine.apache.org by li...@apache.org on 2020/03/05 01:25:34 UTC
[submarine] branch master updated: SUBMARINE-377. [SDK] Submit a
job by pysubamrine
This is an automated email from the ASF dual-hosted git repository.
liuxun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git
The following commit(s) were added to refs/heads/master by this push:
new 39f7a27 SUBMARINE-377. [SDK] Submit a job by pysubamrine
39f7a27 is described below
commit 39f7a275f489de53d16dddc7ea206c30f66a1ed1
Author: pingsutw <pi...@gmail.com>
AuthorDate: Tue Mar 3 17:47:05 2020 +0800
SUBMARINE-377. [SDK] Submit a job by pysubamrine
### What is this PR for?
Use python SDK to submit a submarine job.
### What type of PR is it?
[Improvement]
### Todos
* [ ] - Task
### What is the Jira issue?
https://issues.apache.org/jira/browse/SUBMARINE-377
### How should this be tested?
https://travis-ci.org/pingsutw/hadoop-submarine/builds/648336769
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: pingsutw <pi...@gmail.com>
Closes #179 from pingsutw/SUBMARINE-377 and squashes the following commits:
d39a1c0 [pingsutw] Fix the typo
851b0eb [pingsutw] SUBMARINE-377. [SDK] Submit a job by pysubamrine
---
submarine-sdk/pysubmarine/example/client/README.md | 32 ++++++++++
.../pysubmarine/example/client/mnist.json | 31 ++++++++++
submarine-sdk/pysubmarine/setup.py | 1 +
submarine-sdk/pysubmarine/submarine/exceptions.py | 10 +++
.../submarine/{exceptions.py => job/__init__.py} | 12 +---
.../{exceptions.py => job/submarine_job_client.py} | 31 +++++++---
.../pysubmarine/submarine/utils/rest_utils.py | 72 ++++++++++++++++++++++
.../pysubmarine/tests/client/test_client.py | 48 +++++++++++++++
.../pysubmarine/tests/utils/test_rest_utils.py | 59 ++++++++++++++++++
9 files changed, 278 insertions(+), 18 deletions(-)
diff --git a/submarine-sdk/pysubmarine/example/client/README.md b/submarine-sdk/pysubmarine/example/client/README.md
new file mode 100644
index 0000000..4095a7c
--- /dev/null
+++ b/submarine-sdk/pysubmarine/example/client/README.md
@@ -0,0 +1,32 @@
+<!---
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+
+# Submarine Client API Example
+This example shows how you could use Submarine Client API to
+manage submarine tasks
+
+## Prerequisites
+- [Deploy Submarine Server on Kubernetes](https://github.com/apache/submarine/blob/master/docs/submarine-server/setup-kubernetes.md)
+- [Deploy Tensorflow Operator on Kubernetes](https://github.com/apache/submarine/blob/master/docs/submarine-server/ml-frameworks/tensorflow.md)
+
+## Submit Job
+1. Create a job description for submarine client. e.g.[mnist.json](./mnist.json)
+
+2. Create Submarine job client
+```python
+from submarine.job import SubmarineJOBClient
+client = SubmarineJOBClient('localhost', 8080)
+```
+3. Submit job
+```python
+response = client.submit_job('mnist.json')
+```
diff --git a/submarine-sdk/pysubmarine/example/client/mnist.json b/submarine-sdk/pysubmarine/example/client/mnist.json
new file mode 100644
index 0000000..d84c966
--- /dev/null
+++ b/submarine-sdk/pysubmarine/example/client/mnist.json
@@ -0,0 +1,31 @@
+{
+ "name": "mnist",
+ "librarySpec": {
+ "name": "TensorFlow",
+ "version": "2.1.0",
+ "image": "gcr.io/kubeflow-ci/tf-mnist-with-summaries:1.0",
+ "cmd": "python /var/tf_mnist/mnist_with_summaries.py --log_dir=/train/log --learning_rate=0.01 --batch_size=150",
+ "envVars": {
+ "ENV_1": "ENV1"
+ }
+ },
+ "submitterSpec": {
+ "type": "k8s",
+ "configPath": null,
+ "namespace": "submarine",
+ "kind": "TFJob",
+ "apiVersion": "kubeflow.org/v1"
+ },
+ "taskSpecs": {
+ "Ps": {
+ "name": "tensorflow",
+ "replicas": 1,
+ "resources": "cpu=1,memory=1024M"
+ },
+ "Worker": {
+ "name": "tensorflow",
+ "replicas": 1,
+ "resources": "cpu=1,memory=1024M"
+ }
+ }
+}
diff --git a/submarine-sdk/pysubmarine/setup.py b/submarine-sdk/pysubmarine/setup.py
index c03f2c1..8e4aac2 100644
--- a/submarine-sdk/pysubmarine/setup.py
+++ b/submarine-sdk/pysubmarine/setup.py
@@ -30,6 +30,7 @@ setup(
'sqlparse',
'pymysql',
'tensorflow>=1.14.0,<2.0.0',
+ 'requests'
],
classifiers=[
'Intended Audience :: Developers',
diff --git a/submarine-sdk/pysubmarine/submarine/exceptions.py b/submarine-sdk/pysubmarine/submarine/exceptions.py
index 6ce1bca..3d760fd 100644
--- a/submarine-sdk/pysubmarine/submarine/exceptions.py
+++ b/submarine-sdk/pysubmarine/submarine/exceptions.py
@@ -24,3 +24,13 @@ class SubmarineException(Exception):
"""
self.message = message
super(SubmarineException, self).__init__(message)
+
+
+class RestException(SubmarineException):
+ """Exception thrown on non 200-level responses from the REST API"""
+ def __init__(self, json):
+ error_code = json.get('error_code')
+ message = "%s: %s" % (error_code,
+ json['message'] if 'message' in json else "Response: " + str(json))
+ super(RestException, self).__init__(message)
+ self.json = json
diff --git a/submarine-sdk/pysubmarine/submarine/exceptions.py b/submarine-sdk/pysubmarine/submarine/job/__init__.py
similarity index 68%
copy from submarine-sdk/pysubmarine/submarine/exceptions.py
copy to submarine-sdk/pysubmarine/submarine/job/__init__.py
index 6ce1bca..3eba92f 100644
--- a/submarine-sdk/pysubmarine/submarine/exceptions.py
+++ b/submarine-sdk/pysubmarine/submarine/job/__init__.py
@@ -13,14 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from submarine.job.submarine_job_client import SubmarineJOBClient
-class SubmarineException(Exception):
- """
- Generic exception thrown to surface failure information about external-facing operations.
- """
- def __init__(self, message):
- """
- :param message: The message describing the error that occured.
- """
- self.message = message
- super(SubmarineException, self).__init__(message)
+__all__ = ['SubmarineJOBClient']
diff --git a/submarine-sdk/pysubmarine/submarine/exceptions.py b/submarine-sdk/pysubmarine/submarine/job/submarine_job_client.py
similarity index 51%
copy from submarine-sdk/pysubmarine/submarine/exceptions.py
copy to submarine-sdk/pysubmarine/submarine/job/submarine_job_client.py
index 6ce1bca..6534ae2 100644
--- a/submarine-sdk/pysubmarine/submarine/exceptions.py
+++ b/submarine-sdk/pysubmarine/submarine/job/submarine_job_client.py
@@ -13,14 +13,29 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+from submarine.utils.rest_utils import http_request
+import logging
+import json
-class SubmarineException(Exception):
- """
- Generic exception thrown to surface failure information about external-facing operations.
- """
- def __init__(self, message):
+_logger = logging.getLogger(__name__)
+
+_PATH_PREFIX = "/api/v1/"
+JOBS = 'jobs'
+
+
+class SubmarineJOBClient:
+ def __init__(self, hostname, port):
+ self.base_url = 'http://' + hostname + ':' + str(port)
+
+ def submit_job(self, conf_path):
"""
- :param message: The message describing the error that occured.
+ Submit a job to submarine server
+ :param conf_path: The location of the configuration file
+ :return: requests.Response
"""
- self.message = message
- super(SubmarineException, self).__init__(message)
+ endpoint = _PATH_PREFIX + JOBS
+ with open(conf_path) as json_file:
+ json_body = json.load(json_file)
+ response = http_request(self.base_url, endpoint=endpoint,
+ method='POST', json_body=json_body)
+ return response
diff --git a/submarine-sdk/pysubmarine/submarine/utils/rest_utils.py b/submarine-sdk/pysubmarine/submarine/utils/rest_utils.py
new file mode 100644
index 0000000..4879954
--- /dev/null
+++ b/submarine-sdk/pysubmarine/submarine/utils/rest_utils.py
@@ -0,0 +1,72 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from submarine.exceptions import SubmarineException, RestException
+import json
+import requests
+import logging
+
+_logger = logging.getLogger(__name__)
+
+
+def http_request(base_url, endpoint, method, json_body,
+ timeout=60, headers=None, **kwargs):
+ """
+ Perform requests.
+ :param base_url: http request base url containing hostname and port. e.g. https://submarine:8088
+ :param endpoint: specified as a relative or absolute url
+ :param method: http request method
+ :param json_body: request json body, for `application/json`
+ :param timeout: How many seconds to wait for the server to send data
+ :param headers: Dictionary of HTTP Headers to send with the :class:`Request`.
+ :return:
+ """
+ method = method.upper()
+ assert method in ['GET', 'HEAD', 'DELETE', 'POST', 'PUT',
+ 'PATCH', 'OPTIONS']
+ headers = headers or {}
+ if 'Content-Type' not in headers:
+ headers['Content-Type'] = 'application/json'
+
+ url = base_url + endpoint
+ response = requests.request(url=url, method=method, json=json_body, headers=headers,
+ timeout=timeout, **kwargs)
+ verify_rest_response(response, endpoint)
+
+ print('test')
+ print(response.text)
+ response = json.loads(response.text)
+ result = response['result']
+ return result
+
+
+def _can_parse_as_json(string):
+ try:
+ json.loads(string)
+ return True
+ except Exception: # pylint: disable=broad-except
+ return False
+
+
+def verify_rest_response(response, endpoint):
+ """Verify the return code and raise exception if the request was not successful."""
+ if response.status_code != 200:
+ if _can_parse_as_json(response.text):
+ raise RestException(json.loads(response.text))
+ else:
+ base_msg = "API request to endpoint %s failed with error code " \
+ "%s != 200" % (endpoint, response.status_code)
+ raise SubmarineException("%s. Response body: '%s'" % (base_msg, response.text))
+ return response
diff --git a/submarine-sdk/pysubmarine/tests/client/test_client.py b/submarine-sdk/pysubmarine/tests/client/test_client.py
new file mode 100644
index 0000000..0f4c9c2
--- /dev/null
+++ b/submarine-sdk/pysubmarine/tests/client/test_client.py
@@ -0,0 +1,48 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from submarine.job import SubmarineJOBClient
+import mock
+import pytest
+import json
+
+
+@pytest.fixture(scope="function")
+def output_json_filepath():
+ dummy = {'a': 200, 'b': 2, 'c': 3}
+ path = '/tmp/data.json'
+ with open(path, 'w') as f:
+ json.dump(dummy, f)
+ return path
+
+
+@mock.patch('submarine.job.submarine_job_client.http_request')
+class TestSubmarineJobClient:
+ def test_submit_job(self, mock_http_request, output_json_filepath):
+ client = SubmarineJOBClient('submarine', 8080)
+ mock_http_request.return_value = {'jobId': 'job_1582524742595_0040',
+ 'name': 'submarine', 'identifier': 'test'}
+ response = client.submit_job(output_json_filepath)
+
+ with open(output_json_filepath) as json_file:
+ json_body = json.load(json_file)
+
+ mock_http_request.assert_called_with('http://submarine:8080',
+ json_body=json_body,
+ endpoint='/api/v1/jobs', method='POST')
+
+ assert response['jobId'] == 'job_1582524742595_0040'
+ assert response['name'] == 'submarine'
+ assert response['identifier'] == 'test'
diff --git a/submarine-sdk/pysubmarine/tests/utils/test_rest_utils.py b/submarine-sdk/pysubmarine/tests/utils/test_rest_utils.py
new file mode 100644
index 0000000..de5ff5e
--- /dev/null
+++ b/submarine-sdk/pysubmarine/tests/utils/test_rest_utils.py
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from mock import patch, Mock
+import pytest
+import json
+from submarine.utils.rest_utils import http_request, verify_rest_response
+from submarine.exceptions import RestException, SubmarineException
+
+
+def test_http_request():
+ dummy_json = json.dumps({'result': {'jobId': 'job_1234567', 'name': 'submarine',
+ 'identifier': 'test'}})
+
+ with patch('requests.request') as mock_requests:
+ mock_requests.return_value.text = dummy_json
+ mock_requests.return_value.status_code = 200
+
+ result = http_request('http://submarine:8080', json_body='dummy',
+ endpoint='/api/v1/jobs', method='POST')
+
+ assert result['jobId'] == 'job_1234567'
+ assert result['name'] == 'submarine'
+ assert result['identifier'] == 'test'
+
+
+def test_verify_rest_response():
+ # Test correct response
+ mock_response = Mock()
+ mock_response.status_code = 200
+ verify_rest_response(mock_response, '/api/v1/jobs')
+
+ # Test response status code not equal 200(OK) and response can parse as JSON
+ mock_response.status_code = 400
+ mock_json_body = {'a': 200, 'b': 2, 'c': 3}
+ dummy_json = json.dumps(mock_json_body)
+ mock_response.text = dummy_json
+
+ with pytest.raises(RestException, match=str(mock_json_body)):
+ verify_rest_response(mock_response, '/api/v1/jobs')
+
+ # Test response status code not equal 200(OK) and response can not parse as JSON
+ mock_json_body = 'test, 123'
+ mock_response.text = mock_json_body
+ with pytest.raises(SubmarineException, match='API request to endpoint /api/v1/jobs failed '
+ 'with error code 400 != 200'):
+ verify_rest_response(mock_response, '/api/v1/jobs')
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@submarine.apache.org
For additional commands, e-mail: dev-help@submarine.apache.org