You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@submarine.apache.org by li...@apache.org on 2020/03/05 01:25:34 UTC

[submarine] branch master updated: SUBMARINE-377. [SDK] Submit a job by pysubamrine

This is an automated email from the ASF dual-hosted git repository.

liuxun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git


The following commit(s) were added to refs/heads/master by this push:
     new 39f7a27  SUBMARINE-377. [SDK] Submit a job by pysubamrine
39f7a27 is described below

commit 39f7a275f489de53d16dddc7ea206c30f66a1ed1
Author: pingsutw <pi...@gmail.com>
AuthorDate: Tue Mar 3 17:47:05 2020 +0800

    SUBMARINE-377. [SDK] Submit a job by pysubamrine
    
    ### What is this PR for?
    Use python SDK to submit a submarine job.
    
    ### What type of PR is it?
    [Improvement]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    https://issues.apache.org/jira/browse/SUBMARINE-377
    
    ### How should this be tested?
    https://travis-ci.org/pingsutw/hadoop-submarine/builds/648336769
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: pingsutw <pi...@gmail.com>
    
    Closes #179 from pingsutw/SUBMARINE-377 and squashes the following commits:
    
    d39a1c0 [pingsutw] Fix the typo
    851b0eb [pingsutw] SUBMARINE-377. [SDK] Submit a job by pysubamrine
---
 submarine-sdk/pysubmarine/example/client/README.md | 32 ++++++++++
 .../pysubmarine/example/client/mnist.json          | 31 ++++++++++
 submarine-sdk/pysubmarine/setup.py                 |  1 +
 submarine-sdk/pysubmarine/submarine/exceptions.py  | 10 +++
 .../submarine/{exceptions.py => job/__init__.py}   | 12 +---
 .../{exceptions.py => job/submarine_job_client.py} | 31 +++++++---
 .../pysubmarine/submarine/utils/rest_utils.py      | 72 ++++++++++++++++++++++
 .../pysubmarine/tests/client/test_client.py        | 48 +++++++++++++++
 .../pysubmarine/tests/utils/test_rest_utils.py     | 59 ++++++++++++++++++
 9 files changed, 278 insertions(+), 18 deletions(-)

diff --git a/submarine-sdk/pysubmarine/example/client/README.md b/submarine-sdk/pysubmarine/example/client/README.md
new file mode 100644
index 0000000..4095a7c
--- /dev/null
+++ b/submarine-sdk/pysubmarine/example/client/README.md
@@ -0,0 +1,32 @@
+<!---
+  Licensed under the Apache License, Version 2.0 (the "License");
+  you may not use this file except in compliance with the License.
+  You may obtain a copy of the License at
+   http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License. See accompanying LICENSE file.
+-->
+
+# Submarine Client API Example
+This example shows how you could use Submarine Client API to 
+manage submarine tasks
+
+## Prerequisites
+- [Deploy Submarine Server on Kubernetes](https://github.com/apache/submarine/blob/master/docs/submarine-server/setup-kubernetes.md)
+- [Deploy Tensorflow Operator on Kubernetes](https://github.com/apache/submarine/blob/master/docs/submarine-server/ml-frameworks/tensorflow.md)
+
+## Submit Job
+1. Create a job description for submarine client. e.g.[mnist.json](./mnist.json)
+
+2. Create Submarine job client
+```python
+from submarine.job import SubmarineJOBClient
+client = SubmarineJOBClient('localhost', 8080)
+```
+3. Submit job
+```python
+response = client.submit_job('mnist.json')
+```
diff --git a/submarine-sdk/pysubmarine/example/client/mnist.json b/submarine-sdk/pysubmarine/example/client/mnist.json
new file mode 100644
index 0000000..d84c966
--- /dev/null
+++ b/submarine-sdk/pysubmarine/example/client/mnist.json
@@ -0,0 +1,31 @@
+{
+  "name": "mnist",
+  "librarySpec": {
+    "name": "TensorFlow",
+    "version": "2.1.0",
+    "image": "gcr.io/kubeflow-ci/tf-mnist-with-summaries:1.0",
+    "cmd": "python /var/tf_mnist/mnist_with_summaries.py --log_dir=/train/log --learning_rate=0.01 --batch_size=150",
+    "envVars": {
+      "ENV_1": "ENV1"
+    }
+  },
+  "submitterSpec": {
+    "type": "k8s",
+    "configPath": null,
+    "namespace": "submarine",
+    "kind": "TFJob",
+    "apiVersion": "kubeflow.org/v1"
+  },
+  "taskSpecs": {
+    "Ps": {
+      "name": "tensorflow",
+      "replicas": 1,
+      "resources": "cpu=1,memory=1024M"
+    },
+    "Worker": {
+      "name": "tensorflow",
+      "replicas": 1,
+      "resources": "cpu=1,memory=1024M"
+    }
+  }
+}
diff --git a/submarine-sdk/pysubmarine/setup.py b/submarine-sdk/pysubmarine/setup.py
index c03f2c1..8e4aac2 100644
--- a/submarine-sdk/pysubmarine/setup.py
+++ b/submarine-sdk/pysubmarine/setup.py
@@ -30,6 +30,7 @@ setup(
         'sqlparse',
         'pymysql',
         'tensorflow>=1.14.0,<2.0.0',
+        'requests'
     ],
     classifiers=[
         'Intended Audience :: Developers',
diff --git a/submarine-sdk/pysubmarine/submarine/exceptions.py b/submarine-sdk/pysubmarine/submarine/exceptions.py
index 6ce1bca..3d760fd 100644
--- a/submarine-sdk/pysubmarine/submarine/exceptions.py
+++ b/submarine-sdk/pysubmarine/submarine/exceptions.py
@@ -24,3 +24,13 @@ class SubmarineException(Exception):
         """
         self.message = message
         super(SubmarineException, self).__init__(message)
+
+
+class RestException(SubmarineException):
+    """Exception thrown on non 200-level responses from the REST API"""
+    def __init__(self, json):
+        error_code = json.get('error_code')
+        message = "%s: %s" % (error_code,
+                              json['message'] if 'message' in json else "Response: " + str(json))
+        super(RestException, self).__init__(message)
+        self.json = json
diff --git a/submarine-sdk/pysubmarine/submarine/exceptions.py b/submarine-sdk/pysubmarine/submarine/job/__init__.py
similarity index 68%
copy from submarine-sdk/pysubmarine/submarine/exceptions.py
copy to submarine-sdk/pysubmarine/submarine/job/__init__.py
index 6ce1bca..3eba92f 100644
--- a/submarine-sdk/pysubmarine/submarine/exceptions.py
+++ b/submarine-sdk/pysubmarine/submarine/job/__init__.py
@@ -13,14 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from submarine.job.submarine_job_client import SubmarineJOBClient
 
-class SubmarineException(Exception):
-    """
-    Generic exception thrown to surface failure information about external-facing operations.
-    """
-    def __init__(self, message):
-        """
-        :param message: The message describing the error that occured.
-        """
-        self.message = message
-        super(SubmarineException, self).__init__(message)
+__all__ = ['SubmarineJOBClient']
diff --git a/submarine-sdk/pysubmarine/submarine/exceptions.py b/submarine-sdk/pysubmarine/submarine/job/submarine_job_client.py
similarity index 51%
copy from submarine-sdk/pysubmarine/submarine/exceptions.py
copy to submarine-sdk/pysubmarine/submarine/job/submarine_job_client.py
index 6ce1bca..6534ae2 100644
--- a/submarine-sdk/pysubmarine/submarine/exceptions.py
+++ b/submarine-sdk/pysubmarine/submarine/job/submarine_job_client.py
@@ -13,14 +13,29 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from submarine.utils.rest_utils import http_request
+import logging
+import json
 
-class SubmarineException(Exception):
-    """
-    Generic exception thrown to surface failure information about external-facing operations.
-    """
-    def __init__(self, message):
+_logger = logging.getLogger(__name__)
+
+_PATH_PREFIX = "/api/v1/"
+JOBS = 'jobs'
+
+
+class SubmarineJOBClient:
+    def __init__(self, hostname, port):
+        self.base_url = 'http://' + hostname + ':' + str(port)
+
+    def submit_job(self, conf_path):
         """
-        :param message: The message describing the error that occured.
+        Submit a job to submarine server
+        :param conf_path: The location of the configuration file
+        :return: requests.Response
         """
-        self.message = message
-        super(SubmarineException, self).__init__(message)
+        endpoint = _PATH_PREFIX + JOBS
+        with open(conf_path) as json_file:
+            json_body = json.load(json_file)
+        response = http_request(self.base_url, endpoint=endpoint,
+                                method='POST', json_body=json_body)
+        return response
diff --git a/submarine-sdk/pysubmarine/submarine/utils/rest_utils.py b/submarine-sdk/pysubmarine/submarine/utils/rest_utils.py
new file mode 100644
index 0000000..4879954
--- /dev/null
+++ b/submarine-sdk/pysubmarine/submarine/utils/rest_utils.py
@@ -0,0 +1,72 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from submarine.exceptions import SubmarineException, RestException
+import json
+import requests
+import logging
+
+_logger = logging.getLogger(__name__)
+
+
+def http_request(base_url, endpoint, method, json_body,
+                 timeout=60, headers=None, **kwargs):
+    """
+    Perform requests.
+    :param base_url: http request base url containing hostname and port. e.g. https://submarine:8088
+    :param endpoint: specified as a relative or absolute url
+    :param method: http request method
+    :param json_body: request json body, for `application/json`
+    :param timeout: How many seconds to wait for the server to send data
+    :param headers: Dictionary of HTTP Headers to send with the :class:`Request`.
+    :return:
+    """
+    method = method.upper()
+    assert method in ['GET', 'HEAD', 'DELETE', 'POST', 'PUT',
+                      'PATCH', 'OPTIONS']
+    headers = headers or {}
+    if 'Content-Type' not in headers:
+        headers['Content-Type'] = 'application/json'
+
+    url = base_url + endpoint
+    response = requests.request(url=url, method=method, json=json_body, headers=headers,
+                                timeout=timeout, **kwargs)
+    verify_rest_response(response, endpoint)
+
+    print('test')
+    print(response.text)
+    response = json.loads(response.text)
+    result = response['result']
+    return result
+
+
+def _can_parse_as_json(string):
+    try:
+        json.loads(string)
+        return True
+    except Exception:  # pylint: disable=broad-except
+        return False
+
+
+def verify_rest_response(response, endpoint):
+    """Verify the return code and raise exception if the request was not successful."""
+    if response.status_code != 200:
+        if _can_parse_as_json(response.text):
+            raise RestException(json.loads(response.text))
+        else:
+            base_msg = "API request to endpoint %s failed with error code " \
+                       "%s != 200" % (endpoint, response.status_code)
+            raise SubmarineException("%s. Response body: '%s'" % (base_msg, response.text))
+    return response
diff --git a/submarine-sdk/pysubmarine/tests/client/test_client.py b/submarine-sdk/pysubmarine/tests/client/test_client.py
new file mode 100644
index 0000000..0f4c9c2
--- /dev/null
+++ b/submarine-sdk/pysubmarine/tests/client/test_client.py
@@ -0,0 +1,48 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from submarine.job import SubmarineJOBClient
+import mock
+import pytest
+import json
+
+
+@pytest.fixture(scope="function")
+def output_json_filepath():
+    dummy = {'a': 200, 'b': 2, 'c': 3}
+    path = '/tmp/data.json'
+    with open(path, 'w') as f:
+        json.dump(dummy, f)
+    return path
+
+
+@mock.patch('submarine.job.submarine_job_client.http_request')
+class TestSubmarineJobClient:
+    def test_submit_job(self, mock_http_request, output_json_filepath):
+        client = SubmarineJOBClient('submarine', 8080)
+        mock_http_request.return_value = {'jobId': 'job_1582524742595_0040',
+                                          'name': 'submarine', 'identifier': 'test'}
+        response = client.submit_job(output_json_filepath)
+
+        with open(output_json_filepath) as json_file:
+            json_body = json.load(json_file)
+
+        mock_http_request.assert_called_with('http://submarine:8080',
+                                             json_body=json_body,
+                                             endpoint='/api/v1/jobs', method='POST')
+
+        assert response['jobId'] == 'job_1582524742595_0040'
+        assert response['name'] == 'submarine'
+        assert response['identifier'] == 'test'
diff --git a/submarine-sdk/pysubmarine/tests/utils/test_rest_utils.py b/submarine-sdk/pysubmarine/tests/utils/test_rest_utils.py
new file mode 100644
index 0000000..de5ff5e
--- /dev/null
+++ b/submarine-sdk/pysubmarine/tests/utils/test_rest_utils.py
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from mock import patch, Mock
+import pytest
+import json
+from submarine.utils.rest_utils import http_request, verify_rest_response
+from submarine.exceptions import RestException, SubmarineException
+
+
+def test_http_request():
+    dummy_json = json.dumps({'result': {'jobId': 'job_1234567', 'name': 'submarine',
+                                        'identifier': 'test'}})
+
+    with patch('requests.request') as mock_requests:
+        mock_requests.return_value.text = dummy_json
+        mock_requests.return_value.status_code = 200
+
+        result = http_request('http://submarine:8080', json_body='dummy',
+                              endpoint='/api/v1/jobs', method='POST')
+
+    assert result['jobId'] == 'job_1234567'
+    assert result['name'] == 'submarine'
+    assert result['identifier'] == 'test'
+
+
+def test_verify_rest_response():
+    # Test correct response
+    mock_response = Mock()
+    mock_response.status_code = 200
+    verify_rest_response(mock_response, '/api/v1/jobs')
+
+    # Test response status code not equal 200(OK) and response can parse as JSON
+    mock_response.status_code = 400
+    mock_json_body = {'a': 200, 'b': 2, 'c': 3}
+    dummy_json = json.dumps(mock_json_body)
+    mock_response.text = dummy_json
+
+    with pytest.raises(RestException, match=str(mock_json_body)):
+        verify_rest_response(mock_response, '/api/v1/jobs')
+
+    # Test response status code not equal 200(OK) and response can not parse as JSON
+    mock_json_body = 'test, 123'
+    mock_response.text = mock_json_body
+    with pytest.raises(SubmarineException, match='API request to endpoint /api/v1/jobs failed '
+                                                 'with error code 400 != 200'):
+        verify_rest_response(mock_response, '/api/v1/jobs')


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@submarine.apache.org
For additional commands, e-mail: dev-help@submarine.apache.org