You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@liminal.apache.org by GitBox <gi...@apache.org> on 2021/07/19 15:28:58 UTC

[GitHub] [incubator-liminal] zionrubin opened a new pull request #59: [LIMINAL-56] - add default executor (k8s) for spark

zionrubin opened a new pull request #59:
URL: https://github.com/apache/incubator-liminal/pull/59


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@liminal.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-liminal] aviemzur commented on a change in pull request #59: [LIMINAL-56] add default executor (k8s) for spark

Posted by GitBox <gi...@apache.org>.
aviemzur commented on a change in pull request #59:
URL: https://github.com/apache/incubator-liminal/pull/59#discussion_r672874910



##########
File path: examples/spark-ml-app-demo/k8s/serving.py
##########
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+
+import model_store
+from model_store import ModelStore
+
+_MODEL_STORE = ModelStore(model_store.PRODUCTION)
+_PETAL_WIDTH = 'petal_width'
+
+
+def predict(input_json):
+    print(f'input_json={input_json}')
+    input_dict = json.loads(input_json)
+    model, version = _MODEL_STORE.load_latest_model()
+    result = str(model.predict_proba([[float(input_dict[_PETAL_WIDTH])]])[0][1])
+    print(f'result={result}')
+    return result
+
+
+def healthcheck(self):

Review comment:
       Align with changes on master

##########
File path: examples/aws-ml-app-demo/serving.py
##########
@@ -8,28 +25,13 @@
 
 
 def predict(input_json):
-    try:
-        input_dict = json.loads(input_json)
-        model, version = _MODEL_STORE.load_latest_model()
-        result = str(model.predict_proba([[float(input_dict[_PETAL_WIDTH])]])[0][1])
-        return json.dumps({"result": result, "version": version})
-
-    except IndexError:
-        return 'Failure: the model is not ready yet'
-
-    except Exception as e:
-        print(e)
-        return 'Failure'
+    print(f'input_json={input_json}')

Review comment:
       Why was this change made?

##########
File path: tests/runners/airflow/tasks/test_spark_task.py
##########
@@ -111,26 +210,28 @@ def test_partially_missing_spark_arguments(self):
                     '--class',
                     'org.apache.liminal.MySparkApp',
                     '--conf',
-                    'spark.driver.memory=1g',
-                    '--conf',
                     'spark.driver.maxResultSize=1g',
                     '--conf',
+                    'spark.driver.memory=1g',
+                    '--conf',
                     'spark.yarn.executor.memoryOverhead=500M',
                     'my_app.py',
                     '--query',
                     'select * from dlk_visitor_funnel_dwh_staging.fact_events where '
                     "unified_Date_prt >= '{{yesterday_ds}}'",
                     '--output',
-                    'mytable'].sort()
+                    'mytable']
 
         actual = SparkTask(
             'my_spark_task',
             DummyDag('dag-id', 'my_spark_task'),
             [],
             trigger_rule='all_success',
             liminal_config={},
-            pipeline_config={},
+            pipeline_config={'pipeline': 'pipeline'},
             task_config=task_config
         ).get_runnable_command()
 
-        self.assertEqual(actual.sort(), expected)
+        print(actual)

Review comment:
       Use logging instead of printing

##########
File path: examples/spark-ml-app-demo/emr/archetype/liminal.yml
##########
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+# superliminal for local development
+name: InfraSpark
+type: super
+variables:
+  region: us-east-1
+  core_count: 2
+  emr_version: 'emr-6.2.0'
+  template_url: 'https://s3.amazonaws.com/myorg/emr-template.yml'
+  cluster_name: 'spark-ml-app-demo-liminal'
+  spark_output_path: 'myorg/'

Review comment:
       shouldn't this be an s3 path?

##########
File path: examples/spark-ml-app-demo/k8s/liminal.yml
##########
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+name: MyFirstLiminalSparkApp
+super: InfraSpark
+owner: Bosco Albert Baracus
+variables:
+  dataset_path: '{{spark_output_path}}/outputs/'
+  application: data_cleanup.py
+volumes:
+  - volume: gettingstartedvol
+    claim_name: gettingstartedvol-pvc
+    local:
+      path: .
+services:
+  - service:
+    name: my_datascience_server
+    type: python_server
+    description: my ds server
+    image: myorg/mydatascienceapp
+    source: .
+    endpoints:
+      - endpoint: /predict
+        module: serving
+        function: predict
+      - endpoint: /healthcheck
+        module: serving
+        function: healthcheck
+task_defaults:
+  python:
+    mounts:
+      - mount: mymount
+        volume: gettingstartedvol
+        path: /mnt/gettingstartedvol
+pipelines:
+  - pipeline: my_first_pipeline
+    start_date: 1970-01-01
+    timeout_minutes: 45
+    schedule: 0 * 1 * *
+    tasks:
+      - task: data_preprocessing
+        type: spark
+        description: prepare the data for training
+        application_arguments:
+          - 'data/iris.csv'

Review comment:
       Shouldn't this be an s3 path in case of emr executor? Perhaps you can add a prefix here defined by a variable in the parent

##########
File path: tests/runners/airflow/tasks/test_spark_task.py
##########
@@ -57,26 +154,27 @@ def test_get_runnable_command(self):
             [],
             trigger_rule='all_success',
             liminal_config={},
-            pipeline_config={},
+            pipeline_config={'pipeline': 'pipeline'},
             task_config=task_config
         ).get_runnable_command()
 
         self.assertEqual(actual.sort(), expected.sort())
 
+
     def test_missing_spark_arguments(self):
         task_config = {
             'application_source': 'my_app.py',
             'application_arguments': {
-                '--query': "select * from dlk_visitor_funnel_dwh_staging.fact_events where unified_Date_prt >= "
-                           "'{{yesterday_ds}}'",
+                '--query': "select * from dlk_visitor_funnel_dwh_staging.fact_events where"
+                           " unified_Date_prt >= '{{yesterday_ds}}'",
                 '--output': 'mytable'
             }
         }
 
         expected = ['spark-submit', 'my_app.py',
                     '--query',
-                    "select * from dlk_visitor_funnel_dwh_staging.fact_events where unified_Date_prt >="
-                    " '{{yesterday_ds}}'",
+                    "select * from dlk_visitor_funnel_dwh_staging.fact_events where "

Review comment:
       Can we make the table/column/database names more generic? Looks like this was lifted from a fork.

##########
File path: examples/aws-ml-app-demo/liminal.yml
##########
@@ -20,17 +37,14 @@ services:
       - endpoint: /healthcheck
         module: serving
         function: healthcheck
-      - endpoint: /version

Review comment:
       Why was this endpoint deleted?

##########
File path: examples/spark-ml-app-demo/emr/archetype/liminal.yml
##########
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+# superliminal for local development
+name: InfraSpark
+type: super
+variables:
+  region: us-east-1
+  core_count: 2
+  emr_version: 'emr-6.2.0'
+  template_url: 'https://s3.amazonaws.com/myorg/emr-template.yml'
+  cluster_name: 'spark-ml-app-demo-liminal'
+  spark_output_path: 'myorg/'
+executors:
+  - executor: emr-executor
+    cluster_name: '{{cluster_name}}'
+    type: emr
+  - executor: k8s-executor
+    type: kubernetes
+task_defaults:
+  spark:
+    executor: emr-executor
+    master: yarn
+    deploy_mode: cluster
+    application_source: 'myorg/{{application}}'

Review comment:
       Shouldn't we have an s3 path here? We could do one with a configurable bucket name

##########
File path: tests/runners/airflow/tasks/test_spark_task.py
##########
@@ -15,17 +15,108 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+
+import os
+import tempfile
 from unittest import TestCase
 
+from liminal.build import liminal_apps_builder
+from liminal.kubernetes import volume_util
 from liminal.runners.airflow import DummyDag
+from liminal.runners.airflow.executors.kubernetes import KubernetesPodExecutor
 from liminal.runners.airflow.tasks.spark import SparkTask
+from tests.util import dag_test_utils
 
 
 class TestSparkTask(TestCase):
     """
     Test Spark Task
     """
 
+    _VOLUME_NAME = 'myvol1'
+
+    def test_spark_on_k8s(self):

Review comment:
       Let's add a similar test to test the spark image builder. Without airflow, but with docker

##########
File path: examples/spark-ml-app-demo/k8s/liminal.yml
##########
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+name: MyFirstLiminalSparkApp
+super: InfraSpark
+owner: Bosco Albert Baracus
+variables:
+  dataset_path: '{{spark_output_path}}/outputs/'

Review comment:
       Suggest rename `outputs` to `my_first_liminal_spark_app_outputs`

##########
File path: liminal/build/image/spark/spark.py
##########
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+
+from liminal.build.python import BasePythonImageBuilder
+
+
+class SparkImageBuilder(BasePythonImageBuilder):
+    def __init__(self, config, base_path, relative_source_path, tag):
+        super().__init__(config, base_path, relative_source_path, tag)
+
+    @staticmethod
+    def _dockerfile_path():
+        return os.path.join(os.path.dirname(__file__), 'Dockerfile')
+
+    def _additional_files_from_filename_content_pairs(self):
+        with open(self._dockerfile_path()) as original:
+            data = original.read()
+
+        data = self.__add_pip_install(data)
+        data = self._mount_pip_conf(data)
+
+        return [('Dockerfile', data)]
+
+    @staticmethod
+    def __add_pip_install(data):
+
+        if "" in data:

Review comment:
       The condition should be if a requirements file exists.
   But I suppose that since this uses python image builder as a parent it always has a `requirements.txt` file? If so then we can always do the pip install in the dockerfile and don't need a condition




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@liminal.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-liminal] zionrubin commented on a change in pull request #59: [LIMINAL-56] add default executor (k8s) for spark

Posted by GitBox <gi...@apache.org>.
zionrubin commented on a change in pull request #59:
URL: https://github.com/apache/incubator-liminal/pull/59#discussion_r676626911



##########
File path: examples/spark-app-demo/emr/archetype/liminal.yml
##########
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+# superliminal for local development
+name: InfraSpark
+type: super
+variables:
+  region: us-east-1
+  core_count: 2
+  emr_version: 'emr-6.2.0'
+  template_url: 's3://ni-data-infra-dev/liminal-test/emr_cloudformation_template.yml'

Review comment:
       ignore it

##########
File path: examples/spark-app-demo/emr/configs/emr_cloudformation_template.yml
##########
@@ -0,0 +1,55 @@
+AWSTemplateFormatVersion: 2010-09-09

Review comment:
       ignore it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@liminal.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-liminal] aviemzur merged pull request #59: [LIMINAL-56] add default executor (k8s) for spark

Posted by GitBox <gi...@apache.org>.
aviemzur merged pull request #59:
URL: https://github.com/apache/incubator-liminal/pull/59


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@liminal.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-liminal] aviemzur commented on a change in pull request #59: [LIMINAL-56] add default executor (k8s) for spark

Posted by GitBox <gi...@apache.org>.
aviemzur commented on a change in pull request #59:
URL: https://github.com/apache/incubator-liminal/pull/59#discussion_r678021528



##########
File path: examples/spark-app-demo/k8s/archetype/liminal.yml
##########
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+# superliminal for local development
+name: InfraSpark
+owner: Bosco Albert Baracus
+type: super
+executors:
+  - executor: k8s
+    type: kubernetes
+variables:
+  output_root_dir: /mnt/gettingstartedvol
+  input_root_dir: ''
+task_defaults:
+  spark:
+    executor: k8s
+    image: my_spark_image
+    source: .

Review comment:
       Need to create images section

##########
File path: tests/runners/apps/test_app/extra/liminal.yml
##########
@@ -39,6 +39,7 @@ pipeline_defaults:
       type: python
       description: static input task
       image: my_static_input_task_image
+      no_cache: True

Review comment:
       need to add images section

##########
File path: tests/runners/apps/test_app/liminal.yml
##########
@@ -43,6 +43,7 @@ pipelines:
         type: python
         description: parallelized static input task
         image: my_static_input_task_image
+        no_cache: True

Review comment:
       need to add images section




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@liminal.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-liminal] aviemzur commented on a change in pull request #59: [LIMINAL-56] add default executor (k8s) for spark

Posted by GitBox <gi...@apache.org>.
aviemzur commented on a change in pull request #59:
URL: https://github.com/apache/incubator-liminal/pull/59#discussion_r672874910



##########
File path: examples/spark-ml-app-demo/k8s/serving.py
##########
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+
+import model_store
+from model_store import ModelStore
+
+_MODEL_STORE = ModelStore(model_store.PRODUCTION)
+_PETAL_WIDTH = 'petal_width'
+
+
+def predict(input_json):
+    print(f'input_json={input_json}')
+    input_dict = json.loads(input_json)
+    model, version = _MODEL_STORE.load_latest_model()
+    result = str(model.predict_proba([[float(input_dict[_PETAL_WIDTH])]])[0][1])
+    print(f'result={result}')
+    return result
+
+
+def healthcheck(self):

Review comment:
       Align with changes on master

##########
File path: examples/aws-ml-app-demo/serving.py
##########
@@ -8,28 +25,13 @@
 
 
 def predict(input_json):
-    try:
-        input_dict = json.loads(input_json)
-        model, version = _MODEL_STORE.load_latest_model()
-        result = str(model.predict_proba([[float(input_dict[_PETAL_WIDTH])]])[0][1])
-        return json.dumps({"result": result, "version": version})
-
-    except IndexError:
-        return 'Failure: the model is not ready yet'
-
-    except Exception as e:
-        print(e)
-        return 'Failure'
+    print(f'input_json={input_json}')

Review comment:
       Why was this change made?

##########
File path: tests/runners/airflow/tasks/test_spark_task.py
##########
@@ -111,26 +210,28 @@ def test_partially_missing_spark_arguments(self):
                     '--class',
                     'org.apache.liminal.MySparkApp',
                     '--conf',
-                    'spark.driver.memory=1g',
-                    '--conf',
                     'spark.driver.maxResultSize=1g',
                     '--conf',
+                    'spark.driver.memory=1g',
+                    '--conf',
                     'spark.yarn.executor.memoryOverhead=500M',
                     'my_app.py',
                     '--query',
                     'select * from dlk_visitor_funnel_dwh_staging.fact_events where '
                     "unified_Date_prt >= '{{yesterday_ds}}'",
                     '--output',
-                    'mytable'].sort()
+                    'mytable']
 
         actual = SparkTask(
             'my_spark_task',
             DummyDag('dag-id', 'my_spark_task'),
             [],
             trigger_rule='all_success',
             liminal_config={},
-            pipeline_config={},
+            pipeline_config={'pipeline': 'pipeline'},
             task_config=task_config
         ).get_runnable_command()
 
-        self.assertEqual(actual.sort(), expected)
+        print(actual)

Review comment:
       Use logging instead of printing

##########
File path: examples/spark-ml-app-demo/emr/archetype/liminal.yml
##########
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+# superliminal for local development
+name: InfraSpark
+type: super
+variables:
+  region: us-east-1
+  core_count: 2
+  emr_version: 'emr-6.2.0'
+  template_url: 'https://s3.amazonaws.com/myorg/emr-template.yml'
+  cluster_name: 'spark-ml-app-demo-liminal'
+  spark_output_path: 'myorg/'

Review comment:
       shouldn't this be an s3 path?

##########
File path: examples/spark-ml-app-demo/k8s/liminal.yml
##########
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+name: MyFirstLiminalSparkApp
+super: InfraSpark
+owner: Bosco Albert Baracus
+variables:
+  dataset_path: '{{spark_output_path}}/outputs/'
+  application: data_cleanup.py
+volumes:
+  - volume: gettingstartedvol
+    claim_name: gettingstartedvol-pvc
+    local:
+      path: .
+services:
+  - service:
+    name: my_datascience_server
+    type: python_server
+    description: my ds server
+    image: myorg/mydatascienceapp
+    source: .
+    endpoints:
+      - endpoint: /predict
+        module: serving
+        function: predict
+      - endpoint: /healthcheck
+        module: serving
+        function: healthcheck
+task_defaults:
+  python:
+    mounts:
+      - mount: mymount
+        volume: gettingstartedvol
+        path: /mnt/gettingstartedvol
+pipelines:
+  - pipeline: my_first_pipeline
+    start_date: 1970-01-01
+    timeout_minutes: 45
+    schedule: 0 * 1 * *
+    tasks:
+      - task: data_preprocessing
+        type: spark
+        description: prepare the data for training
+        application_arguments:
+          - 'data/iris.csv'

Review comment:
       Shouldn't this be an s3 path in case of emr executor? Perhaps you can add a prefix here defined by a variable in the parent

##########
File path: tests/runners/airflow/tasks/test_spark_task.py
##########
@@ -57,26 +154,27 @@ def test_get_runnable_command(self):
             [],
             trigger_rule='all_success',
             liminal_config={},
-            pipeline_config={},
+            pipeline_config={'pipeline': 'pipeline'},
             task_config=task_config
         ).get_runnable_command()
 
         self.assertEqual(actual.sort(), expected.sort())
 
+
     def test_missing_spark_arguments(self):
         task_config = {
             'application_source': 'my_app.py',
             'application_arguments': {
-                '--query': "select * from dlk_visitor_funnel_dwh_staging.fact_events where unified_Date_prt >= "
-                           "'{{yesterday_ds}}'",
+                '--query': "select * from dlk_visitor_funnel_dwh_staging.fact_events where"
+                           " unified_Date_prt >= '{{yesterday_ds}}'",
                 '--output': 'mytable'
             }
         }
 
         expected = ['spark-submit', 'my_app.py',
                     '--query',
-                    "select * from dlk_visitor_funnel_dwh_staging.fact_events where unified_Date_prt >="
-                    " '{{yesterday_ds}}'",
+                    "select * from dlk_visitor_funnel_dwh_staging.fact_events where "

Review comment:
       Can we make the table/column/database names more generic? Looks like this was lifted from a fork.

##########
File path: examples/aws-ml-app-demo/liminal.yml
##########
@@ -20,17 +37,14 @@ services:
       - endpoint: /healthcheck
         module: serving
         function: healthcheck
-      - endpoint: /version

Review comment:
       Why was this endpoint deleted?

##########
File path: examples/spark-ml-app-demo/emr/archetype/liminal.yml
##########
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+# superliminal for local development
+name: InfraSpark
+type: super
+variables:
+  region: us-east-1
+  core_count: 2
+  emr_version: 'emr-6.2.0'
+  template_url: 'https://s3.amazonaws.com/myorg/emr-template.yml'
+  cluster_name: 'spark-ml-app-demo-liminal'
+  spark_output_path: 'myorg/'
+executors:
+  - executor: emr-executor
+    cluster_name: '{{cluster_name}}'
+    type: emr
+  - executor: k8s-executor
+    type: kubernetes
+task_defaults:
+  spark:
+    executor: emr-executor
+    master: yarn
+    deploy_mode: cluster
+    application_source: 'myorg/{{application}}'

Review comment:
       Shouldn't we have an s3 path here? We could do one with a configurable bucket name

##########
File path: tests/runners/airflow/tasks/test_spark_task.py
##########
@@ -15,17 +15,108 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+
+import os
+import tempfile
 from unittest import TestCase
 
+from liminal.build import liminal_apps_builder
+from liminal.kubernetes import volume_util
 from liminal.runners.airflow import DummyDag
+from liminal.runners.airflow.executors.kubernetes import KubernetesPodExecutor
 from liminal.runners.airflow.tasks.spark import SparkTask
+from tests.util import dag_test_utils
 
 
 class TestSparkTask(TestCase):
     """
     Test Spark Task
     """
 
+    _VOLUME_NAME = 'myvol1'
+
+    def test_spark_on_k8s(self):

Review comment:
       Let's add a similar test to test the spark image builder. Without airflow, but with docker

##########
File path: examples/spark-ml-app-demo/k8s/liminal.yml
##########
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+name: MyFirstLiminalSparkApp
+super: InfraSpark
+owner: Bosco Albert Baracus
+variables:
+  dataset_path: '{{spark_output_path}}/outputs/'

Review comment:
       Suggest rename `outputs` to `my_first_liminal_spark_app_outputs`

##########
File path: liminal/build/image/spark/spark.py
##########
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+
+from liminal.build.python import BasePythonImageBuilder
+
+
+class SparkImageBuilder(BasePythonImageBuilder):
+    def __init__(self, config, base_path, relative_source_path, tag):
+        super().__init__(config, base_path, relative_source_path, tag)
+
+    @staticmethod
+    def _dockerfile_path():
+        return os.path.join(os.path.dirname(__file__), 'Dockerfile')
+
+    def _additional_files_from_filename_content_pairs(self):
+        with open(self._dockerfile_path()) as original:
+            data = original.read()
+
+        data = self.__add_pip_install(data)
+        data = self._mount_pip_conf(data)
+
+        return [('Dockerfile', data)]
+
+    @staticmethod
+    def __add_pip_install(data):
+
+        if "" in data:

Review comment:
       The condition should be if a requirements file exists.
   But I suppose that since this uses python image builder as a parent it always has a `requirements.txt` file? If so then we can always do the pip install in the dockerfile and don't need a condition




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@liminal.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-liminal] aviemzur commented on a change in pull request #59: [LIMINAL-56] add default executor (k8s) for spark

Posted by GitBox <gi...@apache.org>.
aviemzur commented on a change in pull request #59:
URL: https://github.com/apache/incubator-liminal/pull/59#discussion_r676394285



##########
File path: .gitignore
##########
@@ -25,7 +25,6 @@ venv
 *.pyc
 pip-selfcheck.json
 .DS_Store
-build

Review comment:
       Please change this to `/build` rather than deleting this line

##########
File path: liminal/build/image/spark/spark.py
##########
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+
+from liminal.build.python import BasePythonImageBuilder
+
+
+class SparkImageBuilder(BasePythonImageBuilder):
+    def __init__(self, config, base_path, relative_source_path, tag):
+        super().__init__(config, base_path, relative_source_path, tag)
+
+    @staticmethod
+    def _dockerfile_path():
+        return os.path.join(os.path.dirname(__file__), 'Dockerfile')
+
+    def _additional_files_from_filename_content_pairs(self):
+        with open(self._dockerfile_path()) as original:
+            data = original.read()
+
+        data = self.__add_pip_install(data)
+        data = self._mount_pip_conf(data)
+
+        return [('Dockerfile', data)]
+
+    @staticmethod
+    def __add_pip_install(data):
+        new_data = data
+        new_data = new_data.replace('{{INSTALL_COMMANDS}}',

Review comment:
       IIUC this happens always, so why not just change this in the dockerfile instead of this replace?

##########
File path: examples/spark-app-demo/emr/configs/emr_cloudformation_template.yml
##########
@@ -0,0 +1,55 @@
+AWSTemplateFormatVersion: 2010-09-09

Review comment:
       Missing license here. Make sure to run tests locally.

##########
File path: tests/runners/airflow/tasks/test_spark_task.py
##########
@@ -57,26 +154,27 @@ def test_get_runnable_command(self):
             [],
             trigger_rule='all_success',
             liminal_config={},
-            pipeline_config={},
+            pipeline_config={'pipeline': 'pipeline'},
             task_config=task_config
         ).get_runnable_command()
 
         self.assertEqual(actual.sort(), expected.sort())
 
+
     def test_missing_spark_arguments(self):
         task_config = {
             'application_source': 'my_app.py',
             'application_arguments': {
-                '--query': "select * from dlk_visitor_funnel_dwh_staging.fact_events where unified_Date_prt >= "
-                           "'{{yesterday_ds}}'",
+                '--query': "select * from dlk_visitor_funnel_dwh_staging.fact_events where"
+                           " unified_Date_prt >= '{{yesterday_ds}}'",
                 '--output': 'mytable'
             }
         }
 
         expected = ['spark-submit', 'my_app.py',
                     '--query',
-                    "select * from dlk_visitor_funnel_dwh_staging.fact_events where unified_Date_prt >="
-                    " '{{yesterday_ds}}'",
+                    "select * from dlk_visitor_funnel_dwh_staging.fact_events where "

Review comment:
       Can we make the table/column/database names more generic? Looks like this was lifted from a fork.
   
   

##########
File path: examples/spark-app-demo/emr/archetype/liminal.yml
##########
@@ -0,0 +1,62 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+# superliminal for local development
+name: InfraSpark
+type: super
+variables:
+  region: us-east-1
+  core_count: 2
+  emr_version: 'emr-6.2.0'
+  template_url: 's3://ni-data-infra-dev/liminal-test/emr_cloudformation_template.yml'

Review comment:
       Bucket name should be configurable, please take with @naturalett 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@liminal.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-liminal] zionrubin commented on a change in pull request #59: [LIMINAL-56] add default executor (k8s) for spark

Posted by GitBox <gi...@apache.org>.
zionrubin commented on a change in pull request #59:
URL: https://github.com/apache/incubator-liminal/pull/59#discussion_r676626780



##########
File path: liminal/build/image/spark/spark.py
##########
@@ -0,0 +1,45 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+
+from liminal.build.python import BasePythonImageBuilder
+
+
+class SparkImageBuilder(BasePythonImageBuilder):
+    def __init__(self, config, base_path, relative_source_path, tag):
+        super().__init__(config, base_path, relative_source_path, tag)
+
+    @staticmethod
+    def _dockerfile_path():
+        return os.path.join(os.path.dirname(__file__), 'Dockerfile')
+
+    def _additional_files_from_filename_content_pairs(self):
+        with open(self._dockerfile_path()) as original:
+            data = original.read()
+
+        data = self.__add_pip_install(data)
+        data = self._mount_pip_conf(data)
+
+        return [('Dockerfile', data)]
+
+    @staticmethod
+    def __add_pip_install(data):
+        new_data = data
+        new_data = new_data.replace('{{INSTALL_COMMANDS}}',

Review comment:
       I think you asked for it to do it in the code instead of in the docker file. I think that it shouldn't be in the docker file because we can't assume there that sparkImageBuilder extends PythonImageBuilder that sparkImageBuilder extends PythonImageBuilder. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@liminal.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-liminal] aviemzur commented on a change in pull request #59: [LIMINAL-56] add default executor (k8s) for spark

Posted by GitBox <gi...@apache.org>.
aviemzur commented on a change in pull request #59:
URL: https://github.com/apache/incubator-liminal/pull/59#discussion_r672879307



##########
File path: tests/runners/airflow/tasks/test_spark_task.py
##########
@@ -57,26 +154,27 @@ def test_get_runnable_command(self):
             [],
             trigger_rule='all_success',
             liminal_config={},
-            pipeline_config={},
+            pipeline_config={'pipeline': 'pipeline'},
             task_config=task_config
         ).get_runnable_command()
 
         self.assertEqual(actual.sort(), expected.sort())
 
+
     def test_missing_spark_arguments(self):
         task_config = {
             'application_source': 'my_app.py',
             'application_arguments': {
-                '--query': "select * from dlk_visitor_funnel_dwh_staging.fact_events where unified_Date_prt >= "
-                           "'{{yesterday_ds}}'",
+                '--query': "select * from dlk_visitor_funnel_dwh_staging.fact_events where"
+                           " unified_Date_prt >= '{{yesterday_ds}}'",
                 '--output': 'mytable'
             }
         }
 
         expected = ['spark-submit', 'my_app.py',
                     '--query',
-                    "select * from dlk_visitor_funnel_dwh_staging.fact_events where unified_Date_prt >="
-                    " '{{yesterday_ds}}'",
+                    "select * from dlk_visitor_funnel_dwh_staging.fact_events where "

Review comment:
       Can we make the table/column/database names more generic? Looks like this was lifted from a fork.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@liminal.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-liminal] zionrubin commented on a change in pull request #59: [LIMINAL-56] add default executor (k8s) for spark

Posted by GitBox <gi...@apache.org>.
zionrubin commented on a change in pull request #59:
URL: https://github.com/apache/incubator-liminal/pull/59#discussion_r673855462



##########
File path: examples/spark-app-demo/k8s/__init__.py
##########
@@ -0,0 +1,17 @@
+#

Review comment:
       @assapin you can start with the k8s example 
   (i will add readme soon, meanwhile, you can use the getting_started doc with examples/spark-app-demo/k8s files) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@liminal.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-liminal] aviemzur commented on a change in pull request #59: [LIMINAL-56] add default executor (k8s) for spark

Posted by GitBox <gi...@apache.org>.
aviemzur commented on a change in pull request #59:
URL: https://github.com/apache/incubator-liminal/pull/59#discussion_r672874910



##########
File path: examples/spark-ml-app-demo/k8s/serving.py
##########
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+
+import model_store
+from model_store import ModelStore
+
+_MODEL_STORE = ModelStore(model_store.PRODUCTION)
+_PETAL_WIDTH = 'petal_width'
+
+
+def predict(input_json):
+    print(f'input_json={input_json}')
+    input_dict = json.loads(input_json)
+    model, version = _MODEL_STORE.load_latest_model()
+    result = str(model.predict_proba([[float(input_dict[_PETAL_WIDTH])]])[0][1])
+    print(f'result={result}')
+    return result
+
+
+def healthcheck(self):

Review comment:
       Align with changes on master

##########
File path: examples/aws-ml-app-demo/serving.py
##########
@@ -8,28 +25,13 @@
 
 
 def predict(input_json):
-    try:
-        input_dict = json.loads(input_json)
-        model, version = _MODEL_STORE.load_latest_model()
-        result = str(model.predict_proba([[float(input_dict[_PETAL_WIDTH])]])[0][1])
-        return json.dumps({"result": result, "version": version})
-
-    except IndexError:
-        return 'Failure: the model is not ready yet'
-
-    except Exception as e:
-        print(e)
-        return 'Failure'
+    print(f'input_json={input_json}')

Review comment:
       Why was this change made?

##########
File path: tests/runners/airflow/tasks/test_spark_task.py
##########
@@ -111,26 +210,28 @@ def test_partially_missing_spark_arguments(self):
                     '--class',
                     'org.apache.liminal.MySparkApp',
                     '--conf',
-                    'spark.driver.memory=1g',
-                    '--conf',
                     'spark.driver.maxResultSize=1g',
                     '--conf',
+                    'spark.driver.memory=1g',
+                    '--conf',
                     'spark.yarn.executor.memoryOverhead=500M',
                     'my_app.py',
                     '--query',
                     'select * from dlk_visitor_funnel_dwh_staging.fact_events where '
                     "unified_Date_prt >= '{{yesterday_ds}}'",
                     '--output',
-                    'mytable'].sort()
+                    'mytable']
 
         actual = SparkTask(
             'my_spark_task',
             DummyDag('dag-id', 'my_spark_task'),
             [],
             trigger_rule='all_success',
             liminal_config={},
-            pipeline_config={},
+            pipeline_config={'pipeline': 'pipeline'},
             task_config=task_config
         ).get_runnable_command()
 
-        self.assertEqual(actual.sort(), expected)
+        print(actual)

Review comment:
       Use logging instead of printing

##########
File path: examples/spark-ml-app-demo/emr/archetype/liminal.yml
##########
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+# superliminal for local development
+name: InfraSpark
+type: super
+variables:
+  region: us-east-1
+  core_count: 2
+  emr_version: 'emr-6.2.0'
+  template_url: 'https://s3.amazonaws.com/myorg/emr-template.yml'
+  cluster_name: 'spark-ml-app-demo-liminal'
+  spark_output_path: 'myorg/'

Review comment:
       shouldn't this be an s3 path?

##########
File path: examples/spark-ml-app-demo/k8s/liminal.yml
##########
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+name: MyFirstLiminalSparkApp
+super: InfraSpark
+owner: Bosco Albert Baracus
+variables:
+  dataset_path: '{{spark_output_path}}/outputs/'
+  application: data_cleanup.py
+volumes:
+  - volume: gettingstartedvol
+    claim_name: gettingstartedvol-pvc
+    local:
+      path: .
+services:
+  - service:
+    name: my_datascience_server
+    type: python_server
+    description: my ds server
+    image: myorg/mydatascienceapp
+    source: .
+    endpoints:
+      - endpoint: /predict
+        module: serving
+        function: predict
+      - endpoint: /healthcheck
+        module: serving
+        function: healthcheck
+task_defaults:
+  python:
+    mounts:
+      - mount: mymount
+        volume: gettingstartedvol
+        path: /mnt/gettingstartedvol
+pipelines:
+  - pipeline: my_first_pipeline
+    start_date: 1970-01-01
+    timeout_minutes: 45
+    schedule: 0 * 1 * *
+    tasks:
+      - task: data_preprocessing
+        type: spark
+        description: prepare the data for training
+        application_arguments:
+          - 'data/iris.csv'

Review comment:
       Shouldn't this be an s3 path in case of emr executor? Perhaps you can add a prefix here defined by a variable in the parent

##########
File path: tests/runners/airflow/tasks/test_spark_task.py
##########
@@ -57,26 +154,27 @@ def test_get_runnable_command(self):
             [],
             trigger_rule='all_success',
             liminal_config={},
-            pipeline_config={},
+            pipeline_config={'pipeline': 'pipeline'},
             task_config=task_config
         ).get_runnable_command()
 
         self.assertEqual(actual.sort(), expected.sort())
 
+
     def test_missing_spark_arguments(self):
         task_config = {
             'application_source': 'my_app.py',
             'application_arguments': {
-                '--query': "select * from dlk_visitor_funnel_dwh_staging.fact_events where unified_Date_prt >= "
-                           "'{{yesterday_ds}}'",
+                '--query': "select * from dlk_visitor_funnel_dwh_staging.fact_events where"
+                           " unified_Date_prt >= '{{yesterday_ds}}'",
                 '--output': 'mytable'
             }
         }
 
         expected = ['spark-submit', 'my_app.py',
                     '--query',
-                    "select * from dlk_visitor_funnel_dwh_staging.fact_events where unified_Date_prt >="
-                    " '{{yesterday_ds}}'",
+                    "select * from dlk_visitor_funnel_dwh_staging.fact_events where "

Review comment:
       Can we make the table/column/database names more generic? Looks like this was lifted from a fork.

##########
File path: examples/aws-ml-app-demo/liminal.yml
##########
@@ -20,17 +37,14 @@ services:
       - endpoint: /healthcheck
         module: serving
         function: healthcheck
-      - endpoint: /version

Review comment:
       Why was this endpoint deleted?

##########
File path: examples/spark-ml-app-demo/emr/archetype/liminal.yml
##########
@@ -0,0 +1,67 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+# superliminal for local development
+name: InfraSpark
+type: super
+variables:
+  region: us-east-1
+  core_count: 2
+  emr_version: 'emr-6.2.0'
+  template_url: 'https://s3.amazonaws.com/myorg/emr-template.yml'
+  cluster_name: 'spark-ml-app-demo-liminal'
+  spark_output_path: 'myorg/'
+executors:
+  - executor: emr-executor
+    cluster_name: '{{cluster_name}}'
+    type: emr
+  - executor: k8s-executor
+    type: kubernetes
+task_defaults:
+  spark:
+    executor: emr-executor
+    master: yarn
+    deploy_mode: cluster
+    application_source: 'myorg/{{application}}'

Review comment:
       Shouldn't we have an s3 path here? We could do one with a configurable bucket name

##########
File path: tests/runners/airflow/tasks/test_spark_task.py
##########
@@ -15,17 +15,108 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
+
+import os
+import tempfile
 from unittest import TestCase
 
+from liminal.build import liminal_apps_builder
+from liminal.kubernetes import volume_util
 from liminal.runners.airflow import DummyDag
+from liminal.runners.airflow.executors.kubernetes import KubernetesPodExecutor
 from liminal.runners.airflow.tasks.spark import SparkTask
+from tests.util import dag_test_utils
 
 
 class TestSparkTask(TestCase):
     """
     Test Spark Task
     """
 
+    _VOLUME_NAME = 'myvol1'
+
+    def test_spark_on_k8s(self):

Review comment:
       Let's add a similar test to test the spark image builder. Without airflow, but with docker

##########
File path: examples/spark-ml-app-demo/k8s/liminal.yml
##########
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+---
+name: MyFirstLiminalSparkApp
+super: InfraSpark
+owner: Bosco Albert Baracus
+variables:
+  dataset_path: '{{spark_output_path}}/outputs/'

Review comment:
       Suggest rename `outputs` to `my_first_liminal_spark_app_outputs`

##########
File path: liminal/build/image/spark/spark.py
##########
@@ -0,0 +1,50 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import os
+
+from liminal.build.python import BasePythonImageBuilder
+
+
+class SparkImageBuilder(BasePythonImageBuilder):
+    def __init__(self, config, base_path, relative_source_path, tag):
+        super().__init__(config, base_path, relative_source_path, tag)
+
+    @staticmethod
+    def _dockerfile_path():
+        return os.path.join(os.path.dirname(__file__), 'Dockerfile')
+
+    def _additional_files_from_filename_content_pairs(self):
+        with open(self._dockerfile_path()) as original:
+            data = original.read()
+
+        data = self.__add_pip_install(data)
+        data = self._mount_pip_conf(data)
+
+        return [('Dockerfile', data)]
+
+    @staticmethod
+    def __add_pip_install(data):
+
+        if "" in data:

Review comment:
       The condition should be if a requirements file exists.
   But I suppose that since this uses python image builder as a parent it always has a `requirements.txt` file? If so then we can always do the pip install in the dockerfile and don't need a condition




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@liminal.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org