You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by da...@apache.org on 2016/06/14 23:12:50 UTC

[15/50] [abbrv] incubator-beam git commit: Move all files to apache_beam folder

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/internal/pickler.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/internal/pickler.py b/sdks/python/google/cloud/dataflow/internal/pickler.py
deleted file mode 100644
index 00f7fc7..0000000
--- a/sdks/python/google/cloud/dataflow/internal/pickler.py
+++ /dev/null
@@ -1,205 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Pickler for values, functions, and classes.
-
-Pickles created by the pickling library contain non-ASCII characters, so
-we base64-encode the results so that we can put them in a JSON objects.
-The pickler is used to embed FlatMap callable objects into the workflow JSON
-description.
-
-The pickler module should be used to pickle functions and modules; for values,
-the coders.*PickleCoder classes should be used instead.
-"""
-
-import base64
-import logging
-import sys
-import traceback
-import types
-
-import dill
-
-
-def is_nested_class(cls):
-  """Returns true if argument is a class object that appears to be nested."""
-  return (isinstance(cls, type)
-          and cls.__module__ != '__builtin__'
-          and cls.__name__ not in sys.modules[cls.__module__].__dict__)
-
-
-def find_containing_class(nested_class):
-  """Finds containing class of a nestec class passed as argument."""
-
-  def find_containing_class_inner(outer):
-    for k, v in outer.__dict__.items():
-      if v is nested_class:
-        return outer, k
-      elif isinstance(v, (type, types.ClassType)) and hasattr(v, '__dict__'):
-        res = find_containing_class_inner(v)
-        if res: return res
-
-  return find_containing_class_inner(sys.modules[nested_class.__module__])
-
-
-def _nested_type_wrapper(fun):
-  """A wrapper for the standard pickler handler for class objects.
-
-  Args:
-    fun: Original pickler handler for type objects.
-
-  Returns:
-    A wrapper for type objects that handles nested classes.
-
-  The wrapper detects if an object being pickled is a nested class object.
-  For nested class object only it will save the containing class object so
-  the nested structure is recreated during unpickle.
-  """
-
-  def wrapper(pickler, obj):
-    # When the nested class is defined in the __main__ module we do not have to
-    # do anything special because the pickler itself will save the constituent
-    # parts of the type (i.e., name, base classes, dictionary) and then
-    # recreate it during unpickling.
-    if is_nested_class(obj) and obj.__module__ != '__main__':
-      containing_class_and_name = find_containing_class(obj)
-      if containing_class_and_name is not None:
-        return pickler.save_reduce(
-            getattr, containing_class_and_name, obj=obj)
-    try:
-      return fun(pickler, obj)
-    except dill.dill.PicklingError:
-      # pylint: disable=protected-access
-      return pickler.save_reduce(
-          dill.dill._create_type,
-          (type(obj), obj.__name__, obj.__bases__,
-           dill.dill._dict_from_dictproxy(obj.__dict__)),
-          obj=obj)
-      # pylint: enable=protected-access
-
-  return wrapper
-
-# Monkey patch the standard pickler dispatch table entry for type objects.
-# Dill, for certain types, defers to the standard pickler (including type
-# objects). We wrap the standard handler using type_wrapper() because
-# for nested class we want to pickle the actual enclosing class object so we
-# can recreate it during unpickling.
-# TODO(silviuc): Make sure we submit the fix upstream to GitHub dill project.
-dill.dill.Pickler.dispatch[type] = _nested_type_wrapper(
-    dill.dill.Pickler.dispatch[type])
-
-
-# Dill pickles generators objects without complaint, but unpickling produces
-# TypeError: object.__new__(generator) is not safe, use generator.__new__()
-# on some versions of Python.
-def reject_generators(unused_pickler, unused_obj):
-  raise TypeError("can't (safely) pickle generator objects")
-dill.dill.Pickler.dispatch[types.GeneratorType] = reject_generators
-
-
-# This if guards against dill not being full initialized when generating docs.
-if 'save_module' in dir(dill.dill):
-
-  # Always pickle non-main modules by name.
-  old_save_module = dill.dill.save_module
-
-  @dill.dill.register(dill.dill.ModuleType)
-  def save_module(pickler, obj):
-    if dill.dill.is_dill(pickler) and obj is pickler._main:
-      return old_save_module(pickler, obj)
-    else:
-      dill.dill.log.info('M2: %s' % obj)
-      # pylint: disable=protected-access
-      pickler.save_reduce(dill.dill._import_module, (obj.__name__,), obj=obj)
-      # pylint: enable=protected-access
-      dill.dill.log.info('# M2')
-
-  # Pickle module dictionaries (commonly found in lambda's globals)
-  # by referencing their module.
-  old_save_module_dict = dill.dill.save_module_dict
-  known_module_dicts = {}
-
-  @dill.dill.register(dict)
-  def new_save_module_dict(pickler, obj):
-    obj_id = id(obj)
-    if not known_module_dicts or '__file__' in obj or '__package__' in obj:
-      if obj_id not in known_module_dicts:
-        for m in sys.modules.values():
-          try:
-            if m and m.__name__ != '__main__':
-              d = m.__dict__
-              known_module_dicts[id(d)] = m, d
-          except AttributeError:
-            # Skip modules that do not have the __name__ attribute.
-            pass
-    if obj_id in known_module_dicts and dill.dill.is_dill(pickler):
-      m = known_module_dicts[obj_id][0]
-      try:
-        # pylint: disable=protected-access
-        dill.dill._import_module(m.__name__)
-        return pickler.save_reduce(
-            getattr, (known_module_dicts[obj_id][0], '__dict__'), obj=obj)
-      except (ImportError, AttributeError):
-        return old_save_module_dict(pickler, obj)
-    else:
-      return old_save_module_dict(pickler, obj)
-  dill.dill.save_module_dict = new_save_module_dict
-
-
-  def _nest_dill_logging():
-    """Prefix all dill logging with its depth in the callstack.
-
-    Useful for debugging pickling of deeply nested structures.
-    """
-    old_log_info = dill.dill.log.info
-    def new_log_info(msg, *args, **kwargs):
-      old_log_info(
-          ('1 2 3 4 5 6 7 8 9 0 ' * 10)[:len(traceback.extract_stack())] + msg,
-          *args, **kwargs)
-    dill.dill.log.info = new_log_info
-
-
-# Turn off verbose logging from the dill pickler.
-logging.getLogger('dill').setLevel(logging.WARN)
-
-
-# TODO(ccy): Currently, there are still instances of pickler.dumps() and
-# pickler.loads() being used for data, which results in an unnecessary base64
-# encoding.  This should be cleaned up.
-def dumps(o):
-  try:
-    return base64.b64encode(dill.dumps(o))
-  except Exception:          # pylint: disable=broad-except
-    dill.dill._trace(True)   # pylint: disable=protected-access
-    return base64.b64encode(dill.dumps(o))
-  finally:
-    dill.dill._trace(False)  # pylint: disable=protected-access
-
-
-def loads(s):
-  try:
-    return dill.loads(base64.b64decode(s))
-  except Exception:          # pylint: disable=broad-except
-    dill.dill._trace(True)   # pylint: disable=protected-access
-    return dill.loads(base64.b64decode(s))
-  finally:
-    dill.dill._trace(False)  # pylint: disable=protected-access
-
-
-def dump_session(file_path):
-  return dill.dump_session(file_path)
-
-
-def load_session(file_path):
-  return dill.load_session(file_path)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/internal/pickler_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/internal/pickler_test.py b/sdks/python/google/cloud/dataflow/internal/pickler_test.py
deleted file mode 100644
index 4d90084..0000000
--- a/sdks/python/google/cloud/dataflow/internal/pickler_test.py
+++ /dev/null
@@ -1,78 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Unit tests for the pickler module."""
-
-import unittest
-
-from google.cloud.dataflow.internal import module_test
-from google.cloud.dataflow.internal.pickler import dumps
-from google.cloud.dataflow.internal.pickler import loads
-
-
-class PicklerTest(unittest.TestCase):
-
-  def test_basics(self):
-    self.assertEquals([1, 'a', (u'z',)], loads(dumps([1, 'a', (u'z',)])))
-    fun = lambda x: 'xyz-%s' % x
-    self.assertEquals('xyz-abc', loads(dumps(fun))('abc'))
-
-  def test_lambda_with_globals(self):
-    """Tests that the globals of a function are preserved."""
-
-    # The point of the test is that the lambda being called after unpickling
-    # relies on having the re module being loaded.
-    self.assertEquals(
-        ['abc', 'def'],
-        loads(dumps(module_test.get_lambda_with_globals()))('abc def'))
-
-  def test_lambda_with_closure(self):
-    """Tests that the closure of a function is preserved."""
-    self.assertEquals(
-        'closure: abc',
-        loads(dumps(module_test.get_lambda_with_closure('abc')))())
-
-  def test_class(self):
-    """Tests that a class object is pickled correctly."""
-    self.assertEquals(
-        ['abc', 'def'],
-        loads(dumps(module_test.Xyz))().foo('abc def'))
-
-  def test_object(self):
-    """Tests that a class instance is pickled correctly."""
-    self.assertEquals(
-        ['abc', 'def'],
-        loads(dumps(module_test.XYZ_OBJECT)).foo('abc def'))
-
-  def test_nested_class(self):
-    """Tests that a nested class object is pickled correctly."""
-    self.assertEquals(
-        'X:abc',
-        loads(dumps(module_test.TopClass.NestedClass('abc'))).datum)
-    self.assertEquals(
-        'Y:abc',
-        loads(dumps(module_test.TopClass.MiddleClass.NestedClass('abc'))).datum)
-
-  def test_dynamic_class(self):
-    """Tests that a nested class object is pickled correctly."""
-    self.assertEquals(
-        'Z:abc',
-        loads(dumps(module_test.create_class('abc'))).get())
-
-  def test_generators(self):
-    with self.assertRaises(TypeError):
-      dumps((_ for _ in range(10)))
-
-if __name__ == '__main__':
-  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/internal/util.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/internal/util.py b/sdks/python/google/cloud/dataflow/internal/util.py
deleted file mode 100644
index c45f3f3..0000000
--- a/sdks/python/google/cloud/dataflow/internal/util.py
+++ /dev/null
@@ -1,90 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Utility functions used throughout the dataflow package."""
-
-
-class ArgumentPlaceholder(object):
-  """A place holder object replacing PValues in argument lists.
-
-  A Fn object can take any number of "side inputs", which are PValues that will
-  be evaluated during pipeline execution and will be provided to the function
-  at the moment of its execution as positional or keyword arguments.
-
-  This is used only internally and should never be used by user code. A custom
-  Fn object by the time it executes will have such values replaced with real
-  computed values.
-  """
-
-  def __eq__(self, other):
-    """Tests for equality of two placeholder objects.
-
-    Args:
-      other: Another placeholder object to compare to.
-
-    This method is used only for test code. All placeholder objects are
-    equal to each other.
-    """
-    return isinstance(other, ArgumentPlaceholder)
-
-
-def remove_objects_from_args(args, kwargs, pvalue_classes):
-  """Replaces all objects of a given type in args/kwargs with a placeholder.
-
-  Args:
-    args: A list of positional arguments.
-    kwargs: A dictionary of keyword arguments.
-    pvalue_classes: A tuple of class objects representing the types of the
-      arguments that must be replaced with a placeholder value (instance of
-      ArgumentPlaceholder)
-
-  Returns:
-    A 3-tuple containing a modified list of positional arguments, a modified
-    dictionary of keyword arguments, and a list of all objects replaced with
-    a placeholder value.
-  """
-  pvals = []
-  def swapper(value):
-    pvals.append(value)
-    return ArgumentPlaceholder()
-  new_args = [swapper(v) if isinstance(v, pvalue_classes) else v for v in args]
-  # Make sure the order in which we process the dictionary keys is predictable
-  # by sorting the entries first. This will be important when putting back
-  # PValues.
-  new_kwargs = dict((k, swapper(v)) if isinstance(v, pvalue_classes) else (k, v)
-                    for k, v in sorted(kwargs.iteritems()))
-  return (new_args, new_kwargs, pvals)
-
-
-def insert_values_in_args(args, kwargs, values):
-  """Replaces all placeholders in args/kwargs with actual values.
-
-  Args:
-    args: A list of positional arguments.
-    kwargs: A dictionary of keyword arguments.
-    values: A list of values that will be used to replace placeholder values.
-
-  Returns:
-    A 2-tuple containing a modified list of positional arguments, and a
-    modified dictionary of keyword arguments.
-  """
-  # Use a local iterator so that we don't modify values.
-  v_iter = iter(values)
-  new_args = [
-      v_iter.next() if isinstance(arg, ArgumentPlaceholder) else arg
-      for arg in args]
-  new_kwargs = dict(
-      (k, v_iter.next()) if isinstance(v, ArgumentPlaceholder) else (k, v)
-      for k, v in sorted(kwargs.iteritems()))
-  return (new_args, new_kwargs)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b14dfadd/sdks/python/google/cloud/dataflow/internal/util_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/google/cloud/dataflow/internal/util_test.py b/sdks/python/google/cloud/dataflow/internal/util_test.py
deleted file mode 100644
index 6a2fc93..0000000
--- a/sdks/python/google/cloud/dataflow/internal/util_test.py
+++ /dev/null
@@ -1,58 +0,0 @@
-# Copyright 2016 Google Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""Unit tests for the util module."""
-
-import unittest
-
-from google.cloud.dataflow.internal.util import ArgumentPlaceholder
-from google.cloud.dataflow.internal.util import insert_values_in_args
-from google.cloud.dataflow.internal.util import remove_objects_from_args
-
-
-class UtilTest(unittest.TestCase):
-
-  def test_remove_objects_from_args(self):
-    args, kwargs, objs = remove_objects_from_args(
-        [1, 'a'], {'x': 1, 'y': 3.14}, (str, float))
-    self.assertEquals([1, ArgumentPlaceholder()], args)
-    self.assertEquals({'x': 1, 'y': ArgumentPlaceholder()}, kwargs)
-    self.assertEquals(['a', 3.14], objs)
-
-  def test_remove_objects_from_args_nothing_to_remove(self):
-    args, kwargs, objs = remove_objects_from_args(
-        [1, 2], {'x': 1, 'y': 2}, (str, float))
-    self.assertEquals([1, 2], args)
-    self.assertEquals({'x': 1, 'y': 2}, kwargs)
-    self.assertEquals([], objs)
-
-  def test_insert_values_in_args(self):
-    values = ['a', 'b']
-    args = [1, ArgumentPlaceholder()]
-    kwargs = {'x': 1, 'y': ArgumentPlaceholder()}
-    args, kwargs = insert_values_in_args(args, kwargs, values)
-    self.assertEquals([1, 'a'], args)
-    self.assertEquals({'x': 1, 'y': 'b'}, kwargs)
-
-  def test_insert_values_in_args_nothing_to_insert(self):
-    values = []
-    args = [1, 'a']
-    kwargs = {'x': 1, 'y': 'b'}
-    args, kwargs = insert_values_in_args(args, kwargs, values)
-    self.assertEquals([1, 'a'], args)
-    self.assertEquals({'x': 1, 'y': 'b'}, kwargs)
-
-
-if __name__ == '__main__':
-  unittest.main()