You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2021/07/14 23:51:52 UTC

[GitHub] [beam] ibzib commented on a change in pull request #15169: [BEAM-12613] Enable Python build tests for Samza

ibzib commented on a change in pull request #15169:
URL: https://github.com/apache/beam/pull/15169#discussion_r670025003



##########
File path: .test-infra/jenkins/README.md
##########
@@ -89,6 +89,7 @@ Beam Jenkins overview page: [link](https://ci-beam.apache.org/)
 | beam_PostCommit_Py_VR_Dataflow_V2 | [cron](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/), [phrase](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2_PR/) | `Run Python Dataflow V2 ValidatesRunner` | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2) |
 | beam_PostCommit_Py_ValCont | [cron](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/), [phrase](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont_PR/) | `Run Python Dataflow ValidatesContainer` | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont) |
 | beam_PostCommit_Python_VR_Flink | [cron](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/), [phrase](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/) | `Run Python Flink ValidatesRunner` | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Flink) |
+| beam_PostCommit_Python_VR_Samza | [cron](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/), [phrase](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/) | `Run Python Samza ValidatesRunner` | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Samza) |

Review comment:
       The link for `phrase` should be different from `cron`. (I realize Flink has the same problem. I dream of someday automatically generating this page.)

##########
File path: sdks/python/apache_beam/runners/portability/samza_runner_test.py
##########
@@ -0,0 +1,206 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# pytype: skip-file
+
+# Run as
+#
+# pytest samza_runner_test.py[::TestClass::test_case] \
+#     --test-pipeline-options="--environment_type=LOOPBACK"
+import argparse
+import logging
+import shlex
+import unittest
+from shutil import rmtree
+from tempfile import mkdtemp
+
+import pytest
+
+from apache_beam.options.pipeline_options import PortableOptions
+from apache_beam.runners.portability import job_server
+from apache_beam.runners.portability import portable_runner
+from apache_beam.runners.portability import portable_runner_test
+
+_LOGGER = logging.getLogger(__name__)
+
+
+class SamzaRunnerTest(portable_runner_test.PortableRunnerTest):
+  _use_grpc = True
+  _use_subprocesses = True
+
+  expansion_port = None
+  samza_job_server_jar = None
+
+  @pytest.fixture(autouse=True)
+  def parse_options(self, request):
+    if not request.config.option.test_pipeline_options:
+      raise unittest.SkipTest(
+          'Skipping because --test-pipeline-options is not specified.')
+    test_pipeline_options = request.config.option.test_pipeline_options
+    parser = argparse.ArgumentParser(add_help=True)
+    parser.add_argument(
+        '--samza_job_server_jar',
+        help='Job server jar to submit jobs.',
+        action='store')
+    parser.add_argument(
+        '--environment_type',
+        default='LOOPBACK',
+        choices=['DOCKER', 'PROCESS', 'LOOPBACK'],
+        help='Set the environment type for running user code. DOCKER runs '
+        'user code in a container. PROCESS runs user code in '
+        'automatically started processes. LOOPBACK runs user code on '
+        'the same process that originally submitted the job.')
+    parser.add_argument(
+        '--environment_option',
+        '--environment_options',
+        dest='environment_options',
+        action='append',
+        default=None,
+        help=(
+            'Environment configuration for running the user code. '
+            'Recognized options depend on --environment_type.\n '
+            'For DOCKER: docker_container_image (optional)\n '
+            'For PROCESS: process_command (required), process_variables '
+            '(optional, comma-separated)\n '
+            'For EXTERNAL: external_service_address (required)'))
+    known_args, unknown_args = parser.parse_known_args(
+        shlex.split(test_pipeline_options))
+    if unknown_args:
+      _LOGGER.warning('Discarding unrecognized arguments %s' % unknown_args)
+    self.set_samza_job_server_jar(
+        known_args.samza_job_server_jar or
+        job_server.JavaJarJobServer.path_to_beam_jar(
+            ':runners:samza:job-server:shadowJar'))
+    self.environment_type = known_args.environment_type
+    self.environment_options = known_args.environment_options\
+
+  @classmethod
+  def _subprocess_command(cls, job_port, expansion_port):
+    # will be cleaned up at the end of this method, and recreated and used by
+    # the job server
+    tmp_dir = mkdtemp(prefix='samzatest')
+
+    cls.expansion_port = expansion_port
+
+    try:
+      return [
+          'java',
+          '-jar',
+          cls.samza_job_server_jar,
+          '--artifacts-dir',
+          tmp_dir,
+          '--job-port',
+          str(job_port),
+          '--artifact-port',
+          '0',
+          '--expansion-port',
+          str(expansion_port),
+      ]
+    finally:
+      rmtree(tmp_dir)
+
+  @classmethod
+  def set_samza_job_server_jar(cls, samza_job_server_jar):
+    cls.samza_job_server_jar = samza_job_server_jar\

Review comment:
       Typo?
   ```suggestion
       cls.samza_job_server_jar = samza_job_server_jar
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org