You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/12/02 01:32:04 UTC

[beam] branch master updated: Add support for --save_main_session flag

This is an automated email from the ASF dual-hosted git repository.

altay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 907f9a9  Add support for --save_main_session flag
     new c8585fa  This closes #4198
907f9a9 is described below

commit 907f9a9e326bbc67ce866d814223ab82ce56c4f1
Author: Ahmet Altay <al...@google.com>
AuthorDate: Thu Nov 30 13:46:48 2017 -0800

    Add support for --save_main_session flag
---
 .../apache_beam/runners/dataflow/internal/names.py |  2 +-
 .../apache_beam/runners/worker/sdk_worker_main.py  | 43 +++++++++++++++++++++-
 sdks/python/container/boot.go                      |  1 +
 3 files changed, 44 insertions(+), 2 deletions(-)

diff --git a/sdks/python/apache_beam/runners/dataflow/internal/names.py b/sdks/python/apache_beam/runners/dataflow/internal/names.py
index 559b445..6b0fa00 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/names.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/names.py
@@ -17,7 +17,7 @@
 
 """Various names for properties, transforms, etc."""
 
-
+# TODO (altay): Move shared names to a common location.
 # Standard file names used for staging files.
 PICKLED_MAIN_SESSION_FILE = 'pickled_main_session'
 DATAFLOW_SDK_TARBALL_FILE = 'dataflow_python_sdk.tar'
diff --git a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
index 70e4c96..684269e 100644
--- a/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
+++ b/sdks/python/apache_beam/runners/worker/sdk_worker_main.py
@@ -17,13 +17,17 @@
 
 """SDK Fn Harness entry point."""
 
+import json
 import logging
 import os
 import sys
+import traceback
 
 from google.protobuf import text_format
 
+from apache_beam.internal import pickler
 from apache_beam.portability.api import endpoints_pb2
+from apache_beam.runners.dataflow.internal import names
 from apache_beam.runners.worker.log_handler import FnApiLogRecordHandler
 from apache_beam.runners.worker.sdk_worker import SdkHarness
 
@@ -45,8 +49,28 @@ def main(unused_argv):
   else:
     fn_log_handler = None
 
+  if 'PIPELINE_OPTIONS' in os.environ:
+    sdk_pipeline_options = json.loads(os.environ['PIPELINE_OPTIONS'])
+  else:
+    sdk_pipeline_options = {}
+
+  if 'SEMI_PERSISTENT_DIRECTORY' in os.environ:
+    semi_persistent_directory = os.environ['SEMI_PERSISTENT_DIRECTORY']
+  else:
+    semi_persistent_directory = None
+
+  logging.info('semi_persistent_directory: %s', semi_persistent_directory)
+
   try:
-    logging.info('Python sdk harness started.')
+    _load_main_session(semi_persistent_directory)
+  except Exception:  # pylint: disable=broad-except
+    exception_details = traceback.format_exc()
+    logging.error(
+        'Could not load main session: %s', exception_details, exc_info=True)
+
+  try:
+    logging.info('Python sdk harness started with pipeline_options: %s',
+                 sdk_pipeline_options)
     service_descriptor = endpoints_pb2.ApiServiceDescriptor()
     text_format.Merge(os.environ['CONTROL_API_SERVICE_DESCRIPTOR'],
                       service_descriptor)
@@ -62,5 +86,22 @@ def main(unused_argv):
       fn_log_handler.close()
 
 
+def _load_main_session(semi_persistent_directory):
+  """Loads a pickled main session from the path specified."""
+  if semi_persistent_directory:
+    session_file = os.path.join(
+        semi_persistent_directory, 'staged', names.PICKLED_MAIN_SESSION_FILE)
+    if os.path.isfile(session_file):
+      pickler.load_session(session_file)
+    else:
+      logging.warning(
+          'No session file found: %s. Functions defined in __main__ '
+          '(interactive session) may fail.', session_file)
+  else:
+    logging.warning(
+        'No semi_persistent_directory found: Functions defined in __main__ '
+        '(interactive session) may fail.')
+
+
 if __name__ == '__main__':
   main(sys.argv)
diff --git a/sdks/python/container/boot.go b/sdks/python/container/boot.go
index fea0935..31c8267 100644
--- a/sdks/python/container/boot.go
+++ b/sdks/python/container/boot.go
@@ -95,6 +95,7 @@ func main() {
 	// (3) Invoke python
 
 	os.Setenv("PIPELINE_OPTIONS", options)
+	os.Setenv("SEMI_PERSISTENT_DIRECTORY", *semiPersistDir)
 	os.Setenv("LOGGING_API_SERVICE_DESCRIPTOR", proto.MarshalTextString(&pb.ApiServiceDescriptor{Url: *loggingEndpoint}))
 	os.Setenv("CONTROL_API_SERVICE_DESCRIPTOR", proto.MarshalTextString(&pb.ApiServiceDescriptor{Url: *controlEndpoint}))
 

-- 
To stop receiving notification emails like this one, please contact
['"commits@beam.apache.org" <co...@beam.apache.org>'].