You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@liminal.apache.org by GitBox <gi...@apache.org> on 2021/07/20 07:42:39 UTC

[GitHub] [incubator-liminal] aviemzur commented on a change in pull request #59: [LIMINAL-56] add default executor (k8s) for spark

aviemzur commented on a change in pull request #59:
URL: https://github.com/apache/incubator-liminal/pull/59#discussion_r672874910



##########
File path: examples/spark-ml-app-demo/k8s/serving.py
##########
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+
+import model_store
+from model_store import ModelStore
+
+_MODEL_STORE = ModelStore(model_store.PRODUCTION)
+_PETAL_WIDTH = 'petal_width'
+
+
+def predict(input_json):
+    print(f'input_json={input_json}')
+    input_dict = json.loads(input_json)
+    model, version = _MODEL_STORE.load_latest_model()
+    result = str(model.predict_proba([[float(input_dict[_PETAL_WIDTH])]])[0][1])
+    print(f'result={result}')
+    return result
+
+
+def healthcheck(self):

Review comment:
       Align with changes on master

##########
File path: examples/aws-ml-app-demo/serving.py
##########
@@ -8,28 +25,13 @@
 
 
 def predict(input_json):
-    try:
-        input_dict = json.loads(input_json)
-        model, version = _MODEL_STORE.load_latest_model()
-        result = str(model.predict_proba([[float(input_dict[_PETAL_WIDTH])]])[0][1])
-        return json.dumps({"result": result, "version": version})
-
-    except IndexError:
-        return 'Failure: the model is not ready yet'
-
-    except Exception as e:
-        print(e)
-        return 'Failure'
+    print(f'input_json={input_json}')

Review comment:
       Why was this change made?

##########
File path: tests/runners/airflow/tasks/test_spark_task.py
##########
@@ -111,26 +210,28 @@ def test_partially_missing_spark_arguments(self):
                     '--class',
                     'org.apache.liminal.MySparkApp',
                     '--conf',
-                    'spark.driver.memory=1g',
-                    '--conf',
                     'spark.driver.maxResultSize=1g',
                     '--conf',
+                    'spark.driver.memory=1g',
+                    '--conf',
                     'spark.yarn.executor.memoryOverhead=500M',
                     'my_app.py',
                     '--query',
                     'select * from dlk_visitor_funnel_dwh_staging.fact_events where '
                     "unified_Date_prt >= '{{yesterday_ds}}'",
                     '--output',
-                    'mytable'].sort()
+                    'mytable']
 
         actual = SparkTask(
             'my_spark_task',
             DummyDag('dag-id', 'my_spark_task'),
             [],
             trigger_rule='all_success',
             liminal_config={},
-            pipeline_config={},
+            pipeline_config={'pipeline': 'pipeline'},
             task_config=task_config
         ).get_runnable_command()
 
-        self.assertEqual(actual.sort(), expected)
+        print(actual)

Review comment:
       Use logging instead of printing

##########
File path: examples/spark-ml-app-demo/emr/archetype/liminal.yml
##########
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+# superliminal for local development
+name: InfraSpark
+type: super
+variables:
+  region: us-east-1
+  core_count: 2
+  emr_version: 'emr-6.2.0'
+  template_url: 'https://s3.amazonaws.com/myorg/emr-template.yml'
+  cluster_name: 'spark-ml-app-demo-liminal'
+  spark_output_path: 'myorg/'

Review comment:
       shouldn't this be an s3 path?

##########
File path: examples/spark-ml-app-demo/k8s/liminal.yml
##########
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+name: MyFirstLiminalSparkApp
+super: InfraSpark
+owner: Bosco Albert Baracus
+variables:
+  dataset_path: '{{spark_output_path}}/outputs/'
+  application: data_cleanup.py
+volumes:
+  - volume: gettingstartedvol
+    claim_name: gettingstartedvol-pvc
+    local:
+      path: .
+services:
+  - service:
+    name: my_datascience_server
+    type: python_server
+    description: my ds server
+    image: myorg/mydatascienceapp
+    source: .
+    endpoints:
+      - endpoint: /predict
+        module: serving
+        function: predict
+      - endpoint: /healthcheck
+        module: serving
+        function: healthcheck
+task_defaults:
+  python:
+    mounts:
+      - mount: mymount
+        volume: gettingstartedvol
+        path: /mnt/gettingstartedvol
+pipelines:
+  - pipeline: my_first_pipeline
+    start_date: 1970-01-01
+    timeout_minutes: 45
+    schedule: 0 * 1 * *
+    tasks:
+      - task: data_preprocessing
+        type: spark
+        description: prepare the data for training
+        application_arguments:
+          - 'data/iris.csv'

Review comment:
       Shouldn't this be an s3 path in case of emr executor? Perhaps you can add a prefix here defined by a variable in the parent

##########
File path: tests/runners/airflow/tasks/test_spark_task.py
##########
@@ -57,26 +154,27 @@ def test_get_runnable_command(self):
             [],
             trigger_rule='all_success',
             liminal_config={},
-            pipeline_config={},
+            pipeline_config={'pipeline': 'pipeline'},
             task_config=task_config
         ).get_runnable_command()
 
         self.assertEqual(actual.sort(), expected.sort())
 
+
     def test_missing_spark_arguments(self):
         task_config = {
             'application_source': 'my_app.py',
             'application_arguments': {
-                '--query': "select * from dlk_visitor_funnel_dwh_staging.fact_events where unified_Date_prt >= "
-                           "'{{yesterday_ds}}'",
+                '--query': "select * from dlk_visitor_funnel_dwh_staging.fact_events where"
+                           " unified_Date_prt >= '{{yesterday_ds}}'",
                 '--output': 'mytable'
             }
         }
 
         expected = ['spark-submit', 'my_app.py',
                     '--query',
-                    "select * from dlk_visitor_funnel_dwh_staging.fact_events where unified_Date_prt >="
-                    " '{{yesterday_ds}}'",
+                    "select * from dlk_visitor_funnel_dwh_staging.fact_events where "

Review comment:
       Can we make the table/column/database names more generic? Looks like this was lifted from a fork.

##########
File path: examples/aws-ml-app-demo/liminal.yml
##########
@@ -20,17 +37,14 @@ services:
       - endpoint: /healthcheck
         module: serving
         function: healthcheck
-      - endpoint: /version

Review comment:
       Why was this endpoint deleted?

##########
File path: examples/spark-ml-app-demo/emr/archetype/liminal.yml
##########
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+# superliminal for local development
+name: InfraSpark
+type: super
+variables:
+  region: us-east-1
+  core_count: 2
+  emr_version: 'emr-6.2.0'
+  template_url: 'https://s3.amazonaws.com/myorg/emr-template.yml'
+  cluster_name: 'spark-ml-app-demo-liminal'
+  spark_output_path: 'myorg/'
+executors:
+  - executor: emr-executor
+    cluster_name: '{{cluster_name}}'
+    type: emr
+  - executor: k8s-executor
+    type: kubernetes
+task_defaults:
+  spark:
+    executor: emr-executor
+    master: yarn
+    deploy_mode: cluster
+    application_source: 'myorg/{{application}}'

Review comment:
       Shouldn't we have an s3 path here? We could do one with a configurable bucket name

##########
File path: tests/runners/airflow/tasks/test_spark_task.py
##########
@@ -15,17 +15,108 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+
+import os
+import tempfile
 from unittest import TestCase
 
+from liminal.build import liminal_apps_builder
+from liminal.kubernetes import volume_util
 from liminal.runners.airflow import DummyDag
+from liminal.runners.airflow.executors.kubernetes import KubernetesPodExecutor
 from liminal.runners.airflow.tasks.spark import SparkTask
+from tests.util import dag_test_utils
 
 
 class TestSparkTask(TestCase):
     """
     Test Spark Task
     """
 
+    _VOLUME_NAME = 'myvol1'
+
+    def test_spark_on_k8s(self):

Review comment:
       Let's add a similar test to test the spark image builder. Without airflow, but with docker

##########
File path: examples/spark-ml-app-demo/k8s/liminal.yml
##########
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+name: MyFirstLiminalSparkApp
+super: InfraSpark
+owner: Bosco Albert Baracus
+variables:
+  dataset_path: '{{spark_output_path}}/outputs/'

Review comment:
       Suggest rename `outputs` to `my_first_liminal_spark_app_outputs`

##########
File path: liminal/build/image/spark/spark.py
##########
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+
+from liminal.build.python import BasePythonImageBuilder
+
+
+class SparkImageBuilder(BasePythonImageBuilder):
+    def __init__(self, config, base_path, relative_source_path, tag):
+        super().__init__(config, base_path, relative_source_path, tag)
+
+    @staticmethod
+    def _dockerfile_path():
+        return os.path.join(os.path.dirname(__file__), 'Dockerfile')
+
+    def _additional_files_from_filename_content_pairs(self):
+        with open(self._dockerfile_path()) as original:
+            data = original.read()
+
+        data = self.__add_pip_install(data)
+        data = self._mount_pip_conf(data)
+
+        return [('Dockerfile', data)]
+
+    @staticmethod
+    def __add_pip_install(data):
+
+        if "" in data:

Review comment:
       The condition should be if a requirements file exists.
   But I suppose that since this uses python image builder as a parent it always has a `requirements.txt` file? If so then we can always do the pip install in the dockerfile and don't need a condition




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@liminal.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org