You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/07/31 00:41:04 UTC

[GitHub] [beam] ibzib commented on a change in pull request #12385: [BEAM-10527] Migrate Flink and Spark tests to pytest.

ibzib commented on a change in pull request #12385:
URL: https://github.com/apache/beam/pull/12385#discussion_r463345888



##########
File path: sdks/python/apache_beam/runners/portability/flink_runner_test.py
##########
@@ -53,361 +53,380 @@
 from apache_beam.transforms import userstate
 from apache_beam.transforms.sql import SqlTransform
 
+# Run as
+#
+# pytest flink_runner_test.py \
+#     [--test_pipeline_options "--flink_job_server_jar=/path/to/job_server.jar \
+#                               --environment_type=DOCKER"] \
+#     [FlinkRunnerTest.test_method, ...]
+
 _LOGGER = logging.getLogger(__name__)
 
 Row = typing.NamedTuple("Row", [("col1", int), ("col2", unicode)])
 beam.coders.registry.register_coder(Row, beam.coders.RowCoder)
 
-if __name__ == '__main__':
-  # Run as
-  #
-  # python -m apache_beam.runners.portability.flink_runner_test \
-  #     --flink_job_server_jar=/path/to/job_server.jar \
-  #     --environment_type=docker \
-  #     --extra_experiments=beam_experiments \
-  #     [FlinkRunnerTest.test_method, ...]
-
-  parser = argparse.ArgumentParser(add_help=True)
-  parser.add_argument(
-      '--flink_job_server_jar', help='Job server jar to submit jobs.')
-  parser.add_argument(
-      '--streaming',
-      default=False,
-      action='store_true',
-      help='Job type. batch or streaming')
-  parser.add_argument(
-      '--environment_type',
-      default='loopback',
-      help='Environment type. docker, process, or loopback.')
-  parser.add_argument('--environment_config', help='Environment config.')
-  parser.add_argument(
-      '--extra_experiments',
-      default=[],
-      action='append',
-      help='Beam experiments config.')
-  known_args, args = parser.parse_known_args(sys.argv)
-  sys.argv = args
-
-  flink_job_server_jar = (
-      known_args.flink_job_server_jar or
-      job_server.JavaJarJobServer.path_to_beam_jar(
-          'runners:flink:%s:job-server:shadowJar' %
-          FlinkRunnerOptions.PUBLISHED_FLINK_VERSIONS[-1]))
-  streaming = known_args.streaming
-  environment_type = known_args.environment_type.lower()
-  environment_config = (
-      known_args.environment_config if known_args.environment_config else None)
-  extra_experiments = known_args.extra_experiments
-
-  # This is defined here to only be run when we invoke this file explicitly.
-  class FlinkRunnerTest(portable_runner_test.PortableRunnerTest):
-    _use_grpc = True
-    _use_subprocesses = True
-
-    conf_dir = None
-    expansion_port = None
-
-    @classmethod
-    def tearDownClass(cls):
-      if cls.conf_dir and exists(cls.conf_dir):
-        _LOGGER.info("removing conf dir: %s" % cls.conf_dir)
-        rmtree(cls.conf_dir)
-      super(FlinkRunnerTest, cls).tearDownClass()
-
-    @classmethod
-    def _create_conf_dir(cls):
-      """Create (and save a static reference to) a "conf dir", used to provide
-       metrics configs and verify metrics output
-
-       It gets cleaned up when the suite is done executing"""
-
-      if hasattr(cls, 'conf_dir'):
-        cls.conf_dir = mkdtemp(prefix='flinktest-conf')
-
-        # path for a FileReporter to write metrics to
-        cls.test_metrics_path = path.join(cls.conf_dir, 'test-metrics.txt')
-
-        # path to write Flink configuration to
-        conf_path = path.join(cls.conf_dir, 'flink-conf.yaml')
-        file_reporter = 'org.apache.beam.runners.flink.metrics.FileReporter'
-        with open(conf_path, 'w') as f:
-          f.write(
-              linesep.join([
-                  'metrics.reporters: file',
-                  'metrics.reporter.file.class: %s' % file_reporter,
-                  'metrics.reporter.file.path: %s' % cls.test_metrics_path,
-                  'metrics.scope.operator: <operator_name>',
-              ]))
-
-    @classmethod
-    def _subprocess_command(cls, job_port, expansion_port):
-      # will be cleaned up at the end of this method, and recreated and used by
-      # the job server
-      tmp_dir = mkdtemp(prefix='flinktest')
-
-      cls._create_conf_dir()
-      cls.expansion_port = expansion_port
-
-      try:
-        return [
-            'java',
-            '-Dorg.slf4j.simpleLogger.defaultLogLevel=warn',
-            '-jar',
-            flink_job_server_jar,
-            '--flink-master',
-            '[local]',
-            '--flink-conf-dir',
-            cls.conf_dir,
-            '--artifacts-dir',
-            tmp_dir,
-            '--job-port',
-            str(job_port),
-            '--artifact-port',
-            '0',
-            '--expansion-port',
-            str(expansion_port),
-        ]
-      finally:
-        rmtree(tmp_dir)
-
-    @classmethod
-    def get_runner(cls):
-      return portable_runner.PortableRunner()
-
-    @classmethod
-    def get_expansion_service(cls):
-      # TODO Move expansion address resides into PipelineOptions
-      return 'localhost:%s' % cls.expansion_port
-
-    def create_options(self):
-      options = super(FlinkRunnerTest, self).create_options()
-      options.view_as(
-          DebugOptions).experiments = ['beam_fn_api'] + extra_experiments
-      options._all_options['parallelism'] = 2
-      options.view_as(PortableOptions).environment_type = (
-          environment_type.upper())
-      if environment_config:
-        options.view_as(PortableOptions).environment_config = environment_config
-
-      if streaming:
-        options.view_as(StandardOptions).streaming = True
-      return options
-
-    # Can't read host files from within docker, read a "local" file there.
-    def test_read(self):
-      with self.create_pipeline() as p:
-        lines = p | beam.io.ReadFromText('/etc/profile')
-        assert_that(lines, lambda lines: len(lines) > 0)
-
-    def test_no_subtransform_composite(self):
-      raise unittest.SkipTest("BEAM-4781")
 
-    def test_external_transform(self):
+class FlinkRunnerTest(portable_runner_test.PortableRunnerTest):
+  _use_grpc = True
+  _use_subprocesses = True
+
+  conf_dir = None
+  expansion_port = None
+  flink_job_server_jar = None
+
+  def __init__(self, *args, **kwargs):
+    super(FlinkRunnerTest, self).__init__(*args, **kwargs)
+    self.environment_type = None
+    self.environment_config = None
+
+  @pytest.fixture(autouse=True)
+  def parse_options(self, request):
+    test_pipeline_options = (
+        request.config.option.test_pipeline_options
+        if request.config.option.test_pipeline_options else '')

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org