You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/12/13 01:31:04 UTC

[beam] branch master updated: Minor changes to the FnApi Runner to allow Java to run. (#4220)

This is an automated email from the ASF dual-hosted git repository.

robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 60dd7c4  Minor changes to the FnApi Runner to allow Java to run. (#4220)
60dd7c4 is described below

commit 60dd7c4035a8d320b01823cbd4c959d0ef09cf26
Author: Robert Bradshaw <ro...@gmail.com>
AuthorDate: Tue Dec 12 17:30:59 2017 -0800

    Minor changes to the FnApi Runner to allow Java to run. (#4220)
    
    * Minor changes to the FnApi Runner to allow Java to run.
    
    * Aligned the GBK urn.
    * Started a logging service, which Java can't run without.
    * Added some logging that was helpful for debugging.
---
 .../runners/portability/fn_api_runner.py           |  7 +++++-
 .../runners/portability/universal_local_runner.py  | 29 +++++++++++++++++++++-
 .../portability/universal_local_runner_main.py     |  1 +
 sdks/python/apache_beam/utils/urns.py              |  2 +-
 4 files changed, 36 insertions(+), 3 deletions(-)

diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index e40faa5..9b143c6 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -745,10 +745,12 @@ class FnApiRunner(runner.PipelineRunner):
             state_key, elements_data, process_bundle.instruction_id)
 
     # Register and start running the bundle.
+    logging.debug('Register and start running the bundle')
     controller.control_handler.push(process_bundle_registration)
     controller.control_handler.push(process_bundle)
 
     # Wait for the bundle to finish.
+    logging.debug('Wait for the bundle to finish.')
     while True:
       result = controller.control_handler.pull()
       if result and result.instruction_id == process_bundle.instruction_id:
@@ -756,11 +758,14 @@ class FnApiRunner(runner.PipelineRunner):
           raise RuntimeError(result.error)
         break
 
-    # Gather all output data.
     expected_targets = [
         beam_fn_api_pb2.Target(primitive_transform_reference=transform_id,
                                name=output_name)
         for (transform_id, output_name), _ in data_output.items()]
+
+    # Gather all output data.
+    logging.debug('Gather all output data from %s.', expected_targets)
+
     for output in controller.data_plane_handler.input_elements(
         process_bundle.instruction_id, expected_targets):
       target_tuple = (
diff --git a/sdks/python/apache_beam/runners/portability/universal_local_runner.py b/sdks/python/apache_beam/runners/portability/universal_local_runner.py
index b951194..365803d 100644
--- a/sdks/python/apache_beam/runners/portability/universal_local_runner.py
+++ b/sdks/python/apache_beam/runners/portability/universal_local_runner.py
@@ -31,6 +31,7 @@ from concurrent import futures
 import grpc
 from google.protobuf import text_format
 
+from apache_beam.portability.api import beam_fn_api_pb2_grpc
 from apache_beam.portability.api import beam_job_api_pb2
 from apache_beam.portability.api import beam_job_api_pb2_grpc
 from apache_beam.portability.api import endpoints_pb2
@@ -241,6 +242,7 @@ class BeamJob(threading.Thread):
             use_grpc=self._use_grpc,
             sdk_harness_factory=self._sdk_harness_factory
         ).run_via_runner_api(self._pipeline_proto)
+        logging.info("Successfully completed job.")
         self.state = beam_job_api_pb2.JobState.DONE
       except:  # pylint: disable=bare-except
         logging.exception("Error running pipeline.")
@@ -271,10 +273,12 @@ class JobServicer(beam_job_api_pb2_grpc.JobServiceServicer):
     port = self._server.add_insecure_port('localhost:%d' % port)
     beam_job_api_pb2_grpc.add_JobServiceServicer_to_server(self, self._server)
     self._server.start()
+    logging.info("Grpc server started on port %s", port)
     return port
 
   def Prepare(self, request, context=None):
     # For now, just use the job name as the job id.
+    logging.debug("Got Prepare request.")
     preparation_id = "%s-%s" % (request.job_name, uuid.uuid4())
     if self._worker_command_line:
       sdk_harness_factory = functools.partial(
@@ -284,10 +288,12 @@ class JobServicer(beam_job_api_pb2_grpc.JobServiceServicer):
     self._jobs[preparation_id] = BeamJob(
         preparation_id, request.pipeline_options, request.pipeline,
         use_grpc=self._use_grpc, sdk_harness_factory=sdk_harness_factory)
+    logging.debug("Prepared job '%s' as '%s'", request.job_name, preparation_id)
     return beam_job_api_pb2.PrepareJobResponse(preparation_id=preparation_id)
 
   def Run(self, request, context=None):
     job_id = request.preparation_id
+    logging.debug("Runing job '%s'", job_id)
     self._jobs[job_id].start()
     return beam_job_api_pb2.RunJobResponse(job_id=job_id)
 
@@ -330,6 +336,14 @@ class JobServicer(beam_job_api_pb2_grpc.JobServiceServicer):
       pass
 
 
+class BeamFnLoggingServicer(beam_fn_api_pb2_grpc.BeamFnLoggingServicer):
+  def Logging(self, log_bundles, context=None):
+    for log_bundle in log_bundles:
+      for log_entry in log_bundle.log_entries:
+        logging.info('Worker: %s', str(log_entry).replace('\n', ' '))
+    return iter([])
+
+
 class SubprocessSdkWorker(object):
   """Manages a SDK worker implemented as a subprocess communicating over grpc.
   """
@@ -339,13 +353,25 @@ class SubprocessSdkWorker(object):
     self._control_address = control_address
 
   def run(self):
+    logging_server = grpc.server(
+        futures.ThreadPoolExecutor(max_workers=10))
+    logging_port = logging_server.add_insecure_port('[::]:0')
+    logging_server.start()
+    logging_servicer = BeamFnLoggingServicer()
+    beam_fn_api_pb2_grpc.add_BeamFnLoggingServicer_to_server(
+        logging_servicer, logging_server)
+    logging_descriptor = text_format.MessageToString(
+        endpoints_pb2.ApiServiceDescriptor(url='localhost:%s' % logging_port))
+
     control_descriptor = text_format.MessageToString(
         endpoints_pb2.ApiServiceDescriptor(url=self._control_address))
+
     p = subprocess.Popen(
         self._worker_command_line,
         shell=True,
         env=dict(os.environ,
-                 CONTROL_API_SERVICE_DESCRIPTOR=control_descriptor))
+                 CONTROL_API_SERVICE_DESCRIPTOR=control_descriptor,
+                 LOGGING_API_SERVICE_DESCRIPTOR=logging_descriptor))
     try:
       p.wait()
       if p.returncode:
@@ -354,6 +380,7 @@ class SubprocessSdkWorker(object):
     finally:
       if p.poll() is None:
         p.kill()
+      logging_server.stop(0)
 
 
 class JobLogHandler(logging.Handler):
diff --git a/sdks/python/apache_beam/runners/portability/universal_local_runner_main.py b/sdks/python/apache_beam/runners/portability/universal_local_runner_main.py
index 9dd3a7e..93781e1 100644
--- a/sdks/python/apache_beam/runners/portability/universal_local_runner_main.py
+++ b/sdks/python/apache_beam/runners/portability/universal_local_runner_main.py
@@ -41,4 +41,5 @@ def run(argv):
 
 
 if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
   run(sys.argv)
diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py
index 387c8d6..6bdc29f 100644
--- a/sdks/python/apache_beam/utils/urns.py
+++ b/sdks/python/apache_beam/utils/urns.py
@@ -37,7 +37,7 @@ PICKLED_COMBINE_FN = "beam:combinefn:pickled_python:v0.1"
 
 PICKLED_TRANSFORM = "beam:ptransform:pickled_python:v0.1"
 PARDO_TRANSFORM = "beam:ptransform:pardo:v0.1"
-GROUP_BY_KEY_TRANSFORM = "beam:ptransform:group_by_key:v0.1"
+GROUP_BY_KEY_TRANSFORM = "urn:beam:transform:groupbykey:v1"
 GROUP_BY_KEY_ONLY_TRANSFORM = "beam:ptransform:group_by_key_only:v0.1"
 GROUP_ALSO_BY_WINDOW_TRANSFORM = "beam:ptransform:group_also_by_window:v0.1"
 COMBINE_PER_KEY_TRANSFORM = "beam:ptransform:combine_per_key:v0.1"

-- 
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" <co...@beam.apache.org>'].