You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/12/02 01:32:04 UTC
[beam] branch master updated: Add support for --save_main_session
flag
This is an automated email from the ASF dual-hosted git repository.
altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 907f9a9 Add support for --save_main_session flag
new c8585fa This closes #4198
907f9a9 is described below
commit 907f9a9e326bbc67ce866d814223ab82ce56c4f1
Author: Ahmet Altay <al...@google.com>
AuthorDate: Thu Nov 30 13:46:48 2017 -0800
Add support for --save_main_session flag
---
.../apache_beam/runners/dataflow/internal/names.py | 2 +-
.../apache_beam/runners/worker/sdk_worker_main.py | 43 +++++++++++++++++++++-
sdks/python/container/boot.go | 1 +
3 files changed, 44 insertions(+), 2 deletions(-)
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/names.py b/sdks/python/apache_beam/runners/dataflow/internal/names.py
index 559b445..6b0fa00 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/names.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/names.py
@@ -17,7 +17,7 @@
"""Various names for properties, transforms, etc."""
-
+# TODO (altay): Move shared names to a common location.
# Standard file names used for staging files.
PICKLED_MAIN_SESSION_FILE = 'pickled_main_session'
DATAFLOW_SDK_TARBALL_FILE = 'dataflow_python_sdk.tar'
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
index 70e4c96..684269e 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
@@ -17,13 +17,17 @@
"""SDK Fn Harness entry point."""
+import json
import logging
import os
import sys
+import traceback
from google.protobuf import text_format
+from apache_beam.internal import pickler
from apache_beam.portability.api import endpoints_pb2
+from apache_beam.runners.dataflow.internal import names
from apache_beam.runners.worker.log_handler import FnApiLogRecordHandler
from apache_beam.runners.worker.sdk_worker import SdkHarness
@@ -45,8 +49,28 @@ def main(unused_argv):
else:
fn_log_handler = None
+ if 'PIPELINE_OPTIONS' in os.environ:
+ sdk_pipeline_options = json.loads(os.environ['PIPELINE_OPTIONS'])
+ else:
+ sdk_pipeline_options = {}
+
+ if 'SEMI_PERSISTENT_DIRECTORY' in os.environ:
+ semi_persistent_directory = os.environ['SEMI_PERSISTENT_DIRECTORY']
+ else:
+ semi_persistent_directory = None
+
+ logging.info('semi_persistent_directory: %s', semi_persistent_directory)
+
try:
- logging.info('Python sdk harness started.')
+ _load_main_session(semi_persistent_directory)
+ except Exception: # pylint: disable=broad-except
+ exception_details = traceback.format_exc()
+ logging.error(
+ 'Could not load main session: %s', exception_details, exc_info=True)
+
+ try:
+ logging.info('Python sdk harness started with pipeline_options: %s',
+ sdk_pipeline_options)
service_descriptor = endpoints_pb2.ApiServiceDescriptor()
text_format.Merge(os.environ['CONTROL_API_SERVICE_DESCRIPTOR'],
service_descriptor)
@@ -62,5 +86,22 @@ def main(unused_argv):
fn_log_handler.close()
+def _load_main_session(semi_persistent_directory):
+ """Loads a pickled main session from the path specified."""
+ if semi_persistent_directory:
+ session_file = os.path.join(
+ semi_persistent_directory, 'staged', names.PICKLED_MAIN_SESSION_FILE)
+ if os.path.isfile(session_file):
+ pickler.load_session(session_file)
+ else:
+ logging.warning(
+ 'No session file found: %s. Functions defined in __main__ '
+ '(interactive session) may fail.', session_file)
+ else:
+ logging.warning(
+ 'No semi_persistent_directory found: Functions defined in __main__ '
+ '(interactive session) may fail.')
+
+
if __name__ == '__main__':
main(sys.argv)
diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go
index fea0935..31c8267 100644
--- a/sdks/python/container/boot.go
+++ b/sdks/python/container/boot.go
@@ -95,6 +95,7 @@ func main() {
// (3) Invoke python
os.Setenv("PIPELINE_OPTIONS", options)
+ os.Setenv("SEMI_PERSISTENT_DIRECTORY", *semiPersistDir)
os.Setenv("LOGGING_API_SERVICE_DESCRIPTOR", proto.MarshalTextString(&pb.ApiServiceDescriptor{Url: *loggingEndpoint}))
os.Setenv("CONTROL_API_SERVICE_DESCRIPTOR", proto.MarshalTextString(&pb.ApiServiceDescriptor{Url: *controlEndpoint}))
--
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" <co...@beam.apache.org>'].