You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/06/14 23:12:50 UTC
[15/50] [abbrv] incubator-beam git commit: Move all files to
apache_beam folder
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/internal/pickler.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/internal/pickler.py b/sdks/python/google/cloud/dataflow/internal/pickler.py
deleted file mode 100644
index 00f7fc7..0000000
--- a/sdks/python/google/cloud/dataflow/internal/pickler.py
+++ /dev/null
@@ -1,205 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Pickler for values, functions, and classes.
-
-Pickles created by the pickling library contain non-ASCII characters, so
-we base64-encode the results so that we can put them in a JSON objects.
-The pickler is used to embed FlatMap callable objects into the workflow JSON
-description.
-
-The pickler module should be used to pickle functions and modules; for values,
-the coders.*PickleCoder classes should be used instead.
-"""
-
-import base64
-import logging
-import sys
-import traceback
-import types
-
-import dill
-
-
-def is_nested_class(cls):
- """Returns true if argument is a class object that appears to be nested."""
- return (isinstance(cls, type)
- and cls.__module__ != '__builtin__'
- and cls.__name__ not in sys.modules[cls.__module__].__dict__)
-
-
-def find_containing_class(nested_class):
- """Finds containing class of a nestec class passed as argument."""
-
- def find_containing_class_inner(outer):
- for k, v in outer.__dict__.items():
- if v is nested_class:
- return outer, k
- elif isinstance(v, (type, types.ClassType)) and hasattr(v, '__dict__'):
- res = find_containing_class_inner(v)
- if res: return res
-
- return find_containing_class_inner(sys.modules[nested_class.__module__])
-
-
-def _nested_type_wrapper(fun):
- """A wrapper for the standard pickler handler for class objects.
-
- Args:
- fun: Original pickler handler for type objects.
-
- Returns:
- A wrapper for type objects that handles nested classes.
-
- The wrapper detects if an object being pickled is a nested class object.
- For nested class object only it will save the containing class object so
- the nested structure is recreated during unpickle.
- """
-
- def wrapper(pickler, obj):
- # When the nested class is defined in the __main__ module we do not have to
- # do anything special because the pickler itself will save the constituent
- # parts of the type (i.e., name, base classes, dictionary) and then
- # recreate it during unpickling.
- if is_nested_class(obj) and obj.__module__ != '__main__':
- containing_class_and_name = find_containing_class(obj)
- if containing_class_and_name is not None:
- return pickler.save_reduce(
- getattr, containing_class_and_name, obj=obj)
- try:
- return fun(pickler, obj)
- except dill.dill.PicklingError:
- # pylint: disable=protected-access
- return pickler.save_reduce(
- dill.dill._create_type,
- (type(obj), obj.__name__, obj.__bases__,
- dill.dill._dict_from_dictproxy(obj.__dict__)),
- obj=obj)
- # pylint: enable=protected-access
-
- return wrapper
-
-# Monkey patch the standard pickler dispatch table entry for type objects.
-# Dill, for certain types, defers to the standard pickler (including type
-# objects). We wrap the standard handler using type_wrapper() because
-# for nested class we want to pickle the actual enclosing class object so we
-# can recreate it during unpickling.
-# TODO(silviuc): Make sure we submit the fix upstream to GitHub dill project.
-dill.dill.Pickler.dispatch[type] = _nested_type_wrapper(
- dill.dill.Pickler.dispatch[type])
-
-
-# Dill pickles generators objects without complaint, but unpickling produces
-# TypeError: object.__new__(generator) is not safe, use generator.__new__()
-# on some versions of Python.
-def reject_generators(unused_pickler, unused_obj):
- raise TypeError("can't (safely) pickle generator objects")
-dill.dill.Pickler.dispatch[types.GeneratorType] = reject_generators
-
-
-# This if guards against dill not being full initialized when generating docs.
-if 'save_module' in dir(dill.dill):
-
- # Always pickle non-main modules by name.
- old_save_module = dill.dill.save_module
-
- @dill.dill.register(dill.dill.ModuleType)
- def save_module(pickler, obj):
- if dill.dill.is_dill(pickler) and obj is pickler._main:
- return old_save_module(pickler, obj)
- else:
- dill.dill.log.info('M2: %s' % obj)
- # pylint: disable=protected-access
- pickler.save_reduce(dill.dill._import_module, (obj.__name__,), obj=obj)
- # pylint: enable=protected-access
- dill.dill.log.info('# M2')
-
- # Pickle module dictionaries (commonly found in lambda's globals)
- # by referencing their module.
- old_save_module_dict = dill.dill.save_module_dict
- known_module_dicts = {}
-
- @dill.dill.register(dict)
- def new_save_module_dict(pickler, obj):
- obj_id = id(obj)
- if not known_module_dicts or '__file__' in obj or '__package__' in obj:
- if obj_id not in known_module_dicts:
- for m in sys.modules.values():
- try:
- if m and m.__name__ != '__main__':
- d = m.__dict__
- known_module_dicts[id(d)] = m, d
- except AttributeError:
- # Skip modules that do not have the __name__ attribute.
- pass
- if obj_id in known_module_dicts and dill.dill.is_dill(pickler):
- m = known_module_dicts[obj_id][0]
- try:
- # pylint: disable=protected-access
- dill.dill._import_module(m.__name__)
- return pickler.save_reduce(
- getattr, (known_module_dicts[obj_id][0], '__dict__'), obj=obj)
- except (ImportError, AttributeError):
- return old_save_module_dict(pickler, obj)
- else:
- return old_save_module_dict(pickler, obj)
- dill.dill.save_module_dict = new_save_module_dict
-
-
- def _nest_dill_logging():
- """Prefix all dill logging with its depth in the callstack.
-
- Useful for debugging pickling of deeply nested structures.
- """
- old_log_info = dill.dill.log.info
- def new_log_info(msg, *args, **kwargs):
- old_log_info(
- ('1 2 3 4 5 6 7 8 9 0 ' * 10)[:len(traceback.extract_stack())] + msg,
- *args, **kwargs)
- dill.dill.log.info = new_log_info
-
-
-# Turn off verbose logging from the dill pickler.
-logging.getLogger('dill').setLevel(logging.WARN)
-
-
-# TODO(ccy): Currently, there are still instances of pickler.dumps() and
-# pickler.loads() being used for data, which results in an unnecessary base64
-# encoding. This should be cleaned up.
-def dumps(o):
- try:
- return base64.b64encode(dill.dumps(o))
- except Exception: # pylint: disable=broad-except
- dill.dill._trace(True) # pylint: disable=protected-access
- return base64.b64encode(dill.dumps(o))
- finally:
- dill.dill._trace(False) # pylint: disable=protected-access
-
-
-def loads(s):
- try:
- return dill.loads(base64.b64decode(s))
- except Exception: # pylint: disable=broad-except
- dill.dill._trace(True) # pylint: disable=protected-access
- return dill.loads(base64.b64decode(s))
- finally:
- dill.dill._trace(False) # pylint: disable=protected-access
-
-
-def dump_session(file_path):
- return dill.dump_session(file_path)
-
-
-def load_session(file_path):
- return dill.load_session(file_path)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/internal/pickler_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/internal/pickler_test.py b/sdks/python/google/cloud/dataflow/internal/pickler_test.py
deleted file mode 100644
index 4d90084..0000000
--- a/sdks/python/google/cloud/dataflow/internal/pickler_test.py
+++ /dev/null
@@ -1,78 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Unit tests for the pickler module."""
-
-import unittest
-
-from google.cloud.dataflow.internal import module_test
-from google.cloud.dataflow.internal.pickler import dumps
-from google.cloud.dataflow.internal.pickler import loads
-
-
-class PicklerTest(unittest.TestCase):
-
- def test_basics(self):
- self.assertEquals([1, 'a', (u'z',)], loads(dumps([1, 'a', (u'z',)])))
- fun = lambda x: 'xyz-%s' % x
- self.assertEquals('xyz-abc', loads(dumps(fun))('abc'))
-
- def test_lambda_with_globals(self):
- """Tests that the globals of a function are preserved."""
-
- # The point of the test is that the lambda being called after unpickling
- # relies on having the re module being loaded.
- self.assertEquals(
- ['abc', 'def'],
- loads(dumps(module_test.get_lambda_with_globals()))('abc def'))
-
- def test_lambda_with_closure(self):
- """Tests that the closure of a function is preserved."""
- self.assertEquals(
- 'closure: abc',
- loads(dumps(module_test.get_lambda_with_closure('abc')))())
-
- def test_class(self):
- """Tests that a class object is pickled correctly."""
- self.assertEquals(
- ['abc', 'def'],
- loads(dumps(module_test.Xyz))().foo('abc def'))
-
- def test_object(self):
- """Tests that a class instance is pickled correctly."""
- self.assertEquals(
- ['abc', 'def'],
- loads(dumps(module_test.XYZ_OBJECT)).foo('abc def'))
-
- def test_nested_class(self):
- """Tests that a nested class object is pickled correctly."""
- self.assertEquals(
- 'X:abc',
- loads(dumps(module_test.TopClass.NestedClass('abc'))).datum)
- self.assertEquals(
- 'Y:abc',
- loads(dumps(module_test.TopClass.MiddleClass.NestedClass('abc'))).datum)
-
- def test_dynamic_class(self):
- """Tests that a nested class object is pickled correctly."""
- self.assertEquals(
- 'Z:abc',
- loads(dumps(module_test.create_class('abc'))).get())
-
- def test_generators(self):
- with self.assertRaises(TypeError):
- dumps((_ for _ in range(10)))
-
-if __name__ == '__main__':
- unittest.main()
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/internal/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/internal/util.py b/sdks/python/google/cloud/dataflow/internal/util.py
deleted file mode 100644
index c45f3f3..0000000
--- a/sdks/python/google/cloud/dataflow/internal/util.py
+++ /dev/null
@@ -1,90 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Utility functions used throughout the dataflow package."""
-
-
-class ArgumentPlaceholder(object):
- """A place holder object replacing PValues in argument lists.
-
- A Fn object can take any number of "side inputs", which are PValues that will
- be evaluated during pipeline execution and will be provided to the function
- at the moment of its execution as positional or keyword arguments.
-
- This is used only internally and should never be used by user code. A custom
- Fn object by the time it executes will have such values replaced with real
- computed values.
- """
-
- def __eq__(self, other):
- """Tests for equality of two placeholder objects.
-
- Args:
- other: Another placeholder object to compare to.
-
- This method is used only for test code. All placeholder objects are
- equal to each other.
- """
- return isinstance(other, ArgumentPlaceholder)
-
-
-def remove_objects_from_args(args, kwargs, pvalue_classes):
- """Replaces all objects of a given type in args/kwargs with a placeholder.
-
- Args:
- args: A list of positional arguments.
- kwargs: A dictionary of keyword arguments.
- pvalue_classes: A tuple of class objects representing the types of the
- arguments that must be replaced with a placeholder value (instance of
- ArgumentPlaceholder)
-
- Returns:
- A 3-tuple containing a modified list of positional arguments, a modified
- dictionary of keyword arguments, and a list of all objects replaced with
- a placeholder value.
- """
- pvals = []
- def swapper(value):
- pvals.append(value)
- return ArgumentPlaceholder()
- new_args = [swapper(v) if isinstance(v, pvalue_classes) else v for v in args]
- # Make sure the order in which we process the dictionary keys is predictable
- # by sorting the entries first. This will be important when putting back
- # PValues.
- new_kwargs = dict((k, swapper(v)) if isinstance(v, pvalue_classes) else (k, v)
- for k, v in sorted(kwargs.iteritems()))
- return (new_args, new_kwargs, pvals)
-
-
-def insert_values_in_args(args, kwargs, values):
- """Replaces all placeholders in args/kwargs with actual values.
-
- Args:
- args: A list of positional arguments.
- kwargs: A dictionary of keyword arguments.
- values: A list of values that will be used to replace placeholder values.
-
- Returns:
- A 2-tuple containing a modified list of positional arguments, and a
- modified dictionary of keyword arguments.
- """
- # Use a local iterator so that we don't modify values.
- v_iter = iter(values)
- new_args = [
- v_iter.next() if isinstance(arg, ArgumentPlaceholder) else arg
- for arg in args]
- new_kwargs = dict(
- (k, v_iter.next()) if isinstance(v, ArgumentPlaceholder) else (k, v)
- for k, v in sorted(kwargs.iteritems()))
- return (new_args, new_kwargs)
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/internal/util_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/internal/util_test.py b/sdks/python/google/cloud/dataflow/internal/util_test.py
deleted file mode 100644
index 6a2fc93..0000000
--- a/sdks/python/google/cloud/dataflow/internal/util_test.py
+++ /dev/null
@@ -1,58 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Unit tests for the util module."""
-
-import unittest
-
-from google.cloud.dataflow.internal.util import ArgumentPlaceholder
-from google.cloud.dataflow.internal.util import insert_values_in_args
-from google.cloud.dataflow.internal.util import remove_objects_from_args
-
-
-class UtilTest(unittest.TestCase):
-
- def test_remove_objects_from_args(self):
- args, kwargs, objs = remove_objects_from_args(
- [1, 'a'], {'x': 1, 'y': 3.14}, (str, float))
- self.assertEquals([1, ArgumentPlaceholder()], args)
- self.assertEquals({'x': 1, 'y': ArgumentPlaceholder()}, kwargs)
- self.assertEquals(['a', 3.14], objs)
-
- def test_remove_objects_from_args_nothing_to_remove(self):
- args, kwargs, objs = remove_objects_from_args(
- [1, 2], {'x': 1, 'y': 2}, (str, float))
- self.assertEquals([1, 2], args)
- self.assertEquals({'x': 1, 'y': 2}, kwargs)
- self.assertEquals([], objs)
-
- def test_insert_values_in_args(self):
- values = ['a', 'b']
- args = [1, ArgumentPlaceholder()]
- kwargs = {'x': 1, 'y': ArgumentPlaceholder()}
- args, kwargs = insert_values_in_args(args, kwargs, values)
- self.assertEquals([1, 'a'], args)
- self.assertEquals({'x': 1, 'y': 'b'}, kwargs)
-
- def test_insert_values_in_args_nothing_to_insert(self):
- values = []
- args = [1, 'a']
- kwargs = {'x': 1, 'y': 'b'}
- args, kwargs = insert_values_in_args(args, kwargs, values)
- self.assertEquals([1, 'a'], args)
- self.assertEquals({'x': 1, 'y': 'b'}, kwargs)
-
-
-if __name__ == '__main__':
- unittest.main()