You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2017/12/13 01:31:04 UTC
[beam] branch master updated: Minor changes to the FnApi Runner to
allow Java to run. (#4220)
This is an automated email from the ASF dual-hosted git repository.
robertwb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 60dd7c4 Minor changes to the FnApi Runner to allow Java to run. (#4220)
60dd7c4 is described below
commit 60dd7c4035a8d320b01823cbd4c959d0ef09cf26
Author: Robert Bradshaw <ro...@gmail.com>
AuthorDate: Tue Dec 12 17:30:59 2017 -0800
Minor changes to the FnApi Runner to allow Java to run. (#4220)
* Minor changes to the FnApi Runner to allow Java to run.
* Aligned the GBK urn.
* Started a logging service, which Java can't run without.
* Added some logging that was helpful for debugging.
---
.../runners/portability/fn_api_runner.py | 7 +++++-
.../runners/portability/universal_local_runner.py | 29 +++++++++++++++++++++-
.../portability/universal_local_runner_main.py | 1 +
sdks/python/apache_beam/utils/urns.py | 2 +-
4 files changed, 36 insertions(+), 3 deletions(-)
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index e40faa5..9b143c6 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -745,10 +745,12 @@ class FnApiRunner(runner.PipelineRunner):
state_key, elements_data, process_bundle.instruction_id)
# Register and start running the bundle.
+ logging.debug('Register and start running the bundle')
controller.control_handler.push(process_bundle_registration)
controller.control_handler.push(process_bundle)
# Wait for the bundle to finish.
+ logging.debug('Wait for the bundle to finish.')
while True:
result = controller.control_handler.pull()
if result and result.instruction_id == process_bundle.instruction_id:
@@ -756,11 +758,14 @@ class FnApiRunner(runner.PipelineRunner):
raise RuntimeError(result.error)
break
- # Gather all output data.
expected_targets = [
beam_fn_api_pb2.Target(primitive_transform_reference=transform_id,
name=output_name)
for (transform_id, output_name), _ in data_output.items()]
+
+ # Gather all output data.
+ logging.debug('Gather all output data from %s.', expected_targets)
+
for output in controller.data_plane_handler.input_elements(
process_bundle.instruction_id, expected_targets):
target_tuple = (
diff --git a/sdks/python/apache_beam/runners/portability/universal_local_runner.py b/sdks/python/apache_beam/runners/portability/universal_local_runner.py
index b951194..365803d 100644
--- a/sdks/python/apache_beam/runners/portability/universal_local_runner.py
+++ b/sdks/python/apache_beam/runners/portability/universal_local_runner.py
@@ -31,6 +31,7 @@ from concurrent import futures
import grpc
from google.protobuf import text_format
+from apache_beam.portability.api import beam_fn_api_pb2_grpc
from apache_beam.portability.api import beam_job_api_pb2
from apache_beam.portability.api import beam_job_api_pb2_grpc
from apache_beam.portability.api import endpoints_pb2
@@ -241,6 +242,7 @@ class BeamJob(threading.Thread):
use_grpc=self._use_grpc,
sdk_harness_factory=self._sdk_harness_factory
).run_via_runner_api(self._pipeline_proto)
+ logging.info("Successfully completed job.")
self.state = beam_job_api_pb2.JobState.DONE
except: # pylint: disable=bare-except
logging.exception("Error running pipeline.")
@@ -271,10 +273,12 @@ class JobServicer(beam_job_api_pb2_grpc.JobServiceServicer):
port = self._server.add_insecure_port('localhost:%d' % port)
beam_job_api_pb2_grpc.add_JobServiceServicer_to_server(self, self._server)
self._server.start()
+ logging.info("Grpc server started on port %s", port)
return port
def Prepare(self, request, context=None):
# For now, just use the job name as the job id.
+ logging.debug("Got Prepare request.")
preparation_id = "%s-%s" % (request.job_name, uuid.uuid4())
if self._worker_command_line:
sdk_harness_factory = functools.partial(
@@ -284,10 +288,12 @@ class JobServicer(beam_job_api_pb2_grpc.JobServiceServicer):
self._jobs[preparation_id] = BeamJob(
preparation_id, request.pipeline_options, request.pipeline,
use_grpc=self._use_grpc, sdk_harness_factory=sdk_harness_factory)
+ logging.debug("Prepared job '%s' as '%s'", request.job_name, preparation_id)
return beam_job_api_pb2.PrepareJobResponse(preparation_id=preparation_id)
def Run(self, request, context=None):
job_id = request.preparation_id
+ logging.debug("Runing job '%s'", job_id)
self._jobs[job_id].start()
return beam_job_api_pb2.RunJobResponse(job_id=job_id)
@@ -330,6 +336,14 @@ class JobServicer(beam_job_api_pb2_grpc.JobServiceServicer):
pass
+class BeamFnLoggingServicer(beam_fn_api_pb2_grpc.BeamFnLoggingServicer):
+ def Logging(self, log_bundles, context=None):
+ for log_bundle in log_bundles:
+ for log_entry in log_bundle.log_entries:
+ logging.info('Worker: %s', str(log_entry).replace('\n', ' '))
+ return iter([])
+
+
class SubprocessSdkWorker(object):
"""Manages a SDK worker implemented as a subprocess communicating over grpc.
"""
@@ -339,13 +353,25 @@ class SubprocessSdkWorker(object):
self._control_address = control_address
def run(self):
+ logging_server = grpc.server(
+ futures.ThreadPoolExecutor(max_workers=10))
+ logging_port = logging_server.add_insecure_port('[::]:0')
+ logging_server.start()
+ logging_servicer = BeamFnLoggingServicer()
+ beam_fn_api_pb2_grpc.add_BeamFnLoggingServicer_to_server(
+ logging_servicer, logging_server)
+ logging_descriptor = text_format.MessageToString(
+ endpoints_pb2.ApiServiceDescriptor(url='localhost:%s' % logging_port))
+
control_descriptor = text_format.MessageToString(
endpoints_pb2.ApiServiceDescriptor(url=self._control_address))
+
p = subprocess.Popen(
self._worker_command_line,
shell=True,
env=dict(os.environ,
- CONTROL_API_SERVICE_DESCRIPTOR=control_descriptor))
+ CONTROL_API_SERVICE_DESCRIPTOR=control_descriptor,
+ LOGGING_API_SERVICE_DESCRIPTOR=logging_descriptor))
try:
p.wait()
if p.returncode:
@@ -354,6 +380,7 @@ class SubprocessSdkWorker(object):
finally:
if p.poll() is None:
p.kill()
+ logging_server.stop(0)
class JobLogHandler(logging.Handler):
diff --git a/sdks/python/apache_beam/runners/portability/universal_local_runner_main.py b/sdks/python/apache_beam/runners/portability/universal_local_runner_main.py
index 9dd3a7e..93781e1 100644
--- a/sdks/python/apache_beam/runners/portability/universal_local_runner_main.py
+++ b/sdks/python/apache_beam/runners/portability/universal_local_runner_main.py
@@ -41,4 +41,5 @@ def run(argv):
if __name__ == '__main__':
+ logging.getLogger().setLevel(logging.INFO)
run(sys.argv)
diff --git a/sdks/python/apache_beam/utils/urns.py b/sdks/python/apache_beam/utils/urns.py
index 387c8d6..6bdc29f 100644
--- a/sdks/python/apache_beam/utils/urns.py
+++ b/sdks/python/apache_beam/utils/urns.py
@@ -37,7 +37,7 @@ PICKLED_COMBINE_FN = "beam:combinefn:pickled_python:v0.1"
PICKLED_TRANSFORM = "beam:ptransform:pickled_python:v0.1"
PARDO_TRANSFORM = "beam:ptransform:pardo:v0.1"
-GROUP_BY_KEY_TRANSFORM = "beam:ptransform:group_by_key:v0.1"
+GROUP_BY_KEY_TRANSFORM = "urn:beam:transform:groupbykey:v1"
GROUP_BY_KEY_ONLY_TRANSFORM = "beam:ptransform:group_by_key_only:v0.1"
GROUP_ALSO_BY_WINDOW_TRANSFORM = "beam:ptransform:group_also_by_window:v0.1"
COMBINE_PER_KEY_TRANSFORM = "beam:ptransform:combine_per_key:v0.1"
--
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" <co...@beam.apache.org>'].