You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ro...@apache.org on 2016/12/06 18:14:22 UTC

[1/2] incubator-beam git commit: Fix the pickle issue with the inconsistency of dill load and dump session

Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 6dcc429e5 -> f19f767b0


Fix the pickle issue with the inconsistency of dill load and dump session


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/aef4858b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/aef4858b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/aef4858b

Branch: refs/heads/python-sdk
Commit: aef4858b80dfacf3401e6672b9373c82a8e77027
Parents: 6dcc429
Author: Sourabh Bajaj <so...@google.com>
Authored: Fri Dec 2 15:02:18 2016 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Dec 6 10:14:10 2016 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/internal/apiclient.py | 19 +++++++++++++++++--
 sdks/python/apache_beam/internal/pickler.py   |  8 ++++++++
 2 files changed, 25 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aef4858b/sdks/python/apache_beam/internal/apiclient.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/apiclient.py b/sdks/python/apache_beam/internal/apiclient.py
index c5f5f70..f1341a7 100644
--- a/sdks/python/apache_beam/internal/apiclient.py
+++ b/sdks/python/apache_beam/internal/apiclient.py
@@ -46,7 +46,6 @@ from apache_beam.utils.options import WorkerOptions
 from apache_beam.internal.clients import storage
 import apache_beam.internal.clients.dataflow as dataflow
 
-
 BIGQUERY_API_SERVICE = 'bigquery.googleapis.com'
 COMPUTE_API_SERVICE = 'compute.googleapis.com'
 STORAGE_API_SERVICE = 'storage.googleapis.com'
@@ -55,13 +54,19 @@ STORAGE_API_SERVICE = 'storage.googleapis.com'
 class Step(object):
   """Wrapper for a dataflow Step protobuf."""
 
-  def __init__(self, step_kind, step_name):
+  def __init__(self, step_kind, step_name, additional_properties=None):
     self.step_kind = step_kind
     self.step_name = step_name
     self.proto = dataflow.Step(kind=step_kind, name=step_name)
     self.proto.properties = {}
+    self._additional_properties = []
+
+    if additional_properties is not None:
+      for (n, v, t) in additional_properties:
+        self.add_property(n, v, t)
 
   def add_property(self, name, value, with_type=False):
+    self._additional_properties.append((name, value, with_type))
     self.proto.properties.additionalProperties.append(
         dataflow.Step.PropertiesValue.AdditionalProperty(
             key=name, value=to_json_value(value, with_type=with_type)))
@@ -77,6 +82,11 @@ class Step(object):
               outputs.append(entry_prop.value.string_value)
     return outputs
 
+  def __reduce__(self):
+    """Reduce hook for pickling the Step class more easily
+    """
+    return (Step, (self.step_kind, self.step_name, self._additional_properties))
+
   def get_output(self, tag=None):
     """Returns name if it is one of the outputs or first output if name is None.
 
@@ -330,6 +340,11 @@ class Job(object):
   def json(self):
     return encoding.MessageToJson(self.proto)
 
+  def __reduce__(self):
+    """Reduce hook for pickling the Job class more easily
+    """
+    return (Job, (self.options,))
+
 
 class DataflowApplicationClient(object):
   """A Dataflow API client used by application code to create and query jobs."""

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aef4858b/sdks/python/apache_beam/internal/pickler.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py
index 30f0b77..d39a497 100644
--- a/sdks/python/apache_beam/internal/pickler.py
+++ b/sdks/python/apache_beam/internal/pickler.py
@@ -204,6 +204,14 @@ def loads(encoded):
 
 
 def dump_session(file_path):
+  """Pickle the current python session to be used in the worker.
+
+  Note: Due to the inconsistency in the first dump of dill dump_session we
+  create and load the dump twice to have consistent results in the worker and
+  the running session. Check: https://github.com/uqfoundation/dill/issues/195
+  """
+  dill.dump_session(file_path)
+  dill.load_session(file_path)
   return dill.dump_session(file_path)
 
 


[2/2] incubator-beam git commit: Closes #1485

Posted by ro...@apache.org.
Closes #1485


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f19f767b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f19f767b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f19f767b

Branch: refs/heads/python-sdk
Commit: f19f767b09275bdea325bb37a3767d96eeacd4a0
Parents: 6dcc429 aef4858
Author: Robert Bradshaw <ro...@gmail.com>
Authored: Tue Dec 6 10:14:11 2016 -0800
Committer: Robert Bradshaw <ro...@gmail.com>
Committed: Tue Dec 6 10:14:11 2016 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/internal/apiclient.py | 19 +++++++++++++++++--
 sdks/python/apache_beam/internal/pickler.py   |  8 ++++++++
 2 files changed, 25 insertions(+), 2 deletions(-)
----------------------------------------------------------------------