You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/07/30 19:11:19 UTC

[2/3] flink git commit: [FLINK-1927][FLINK-2173][py] Operator distribution rework, fix file paths

http://git-wip-us.apache.org/repos/asf/flink/blob/68b15593/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/dill.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/dill.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/dill.py
deleted file mode 100644
index cddb9ca..0000000
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/dill.py
+++ /dev/null
@@ -1,1034 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Author: Mike McKerns (mmckerns @caltech and @uqfoundation)
-# Copyright (c) 2008-2014 California Institute of Technology.
-# License: 3-clause BSD.  The full license text is available at:
-#  - http://trac.mystic.cacr.caltech.edu/project/pathos/browser/dill/LICENSE
-"""
-dill: a utility for serialization of python objects
-
-Based on code written by Oren Tirosh and Armin Ronacher.
-Extended to a (near) full set of the builtin types (in types module),
-and coded to the pickle interface, by <mm...@caltech.edu>.
-Initial port to python3 by Jonathan Dobson, continued by mmckerns.
-Test against "all" python types (Std. Lib. CH 1-15 @ 2.7) by mmckerns.
-Test against CH16+ Std. Lib. ... TBD.
-"""
-__all__ = ['dump','dumps','load','loads','dump_session','load_session',
-           'Pickler','Unpickler','register','copy','pickle','pickles',
-           'HIGHEST_PROTOCOL','DEFAULT_PROTOCOL','PicklingError',
-           'UnpicklingError','HANDLE_FMODE','CONTENTS_FMODE','FILE_FMODE']
-
-import logging
-log = logging.getLogger("dill")
-log.addHandler(logging.StreamHandler())
-def _trace(boolean):
-    """print a trace through the stack when pickling; useful for debugging"""
-    if boolean: log.setLevel(logging.INFO)
-    else: log.setLevel(logging.WARN)
-    return
-
-import os
-import sys
-diff = None
-_use_diff = False
-PY3 = sys.version_info[0] == 3
-if PY3: #XXX: get types from dill.objtypes ?
-    import builtins as __builtin__
-    from pickle import _Pickler as StockPickler, _Unpickler as StockUnpickler
-    from _thread import LockType
-   #from io import IOBase
-    from types import CodeType, FunctionType, MethodType, GeneratorType, \
-        TracebackType, FrameType, ModuleType, BuiltinMethodType
-    BufferType = memoryview #XXX: unregistered
-    ClassType = type # no 'old-style' classes
-    EllipsisType = type(Ellipsis)
-   #FileType = IOBase
-    NotImplementedType = type(NotImplemented)
-    SliceType = slice
-    TypeType = type # 'new-style' classes #XXX: unregistered
-    XRangeType = range
-    DictProxyType = type(object.__dict__)
-else:
-    import __builtin__
-    from pickle import Pickler as StockPickler, Unpickler as StockUnpickler
-    from thread import LockType
-    from types import CodeType, FunctionType, ClassType, MethodType, \
-         GeneratorType, DictProxyType, XRangeType, SliceType, TracebackType, \
-         NotImplementedType, EllipsisType, FrameType, ModuleType, \
-         BufferType, BuiltinMethodType, TypeType
-from pickle import HIGHEST_PROTOCOL, PicklingError, UnpicklingError
-try:
-    from pickle import DEFAULT_PROTOCOL
-except ImportError:
-    DEFAULT_PROTOCOL = HIGHEST_PROTOCOL
-import __main__ as _main_module
-import marshal
-import gc
-# import zlib
-from weakref import ReferenceType, ProxyType, CallableProxyType
-from functools import partial
-from operator import itemgetter, attrgetter
-# new in python2.5
-if sys.hexversion >= 0x20500f0:
-    from types import MemberDescriptorType, GetSetDescriptorType
-# new in python3.3
-if sys.hexversion < 0x03030000:
-    FileNotFoundError = IOError
-try:
-    import ctypes
-    HAS_CTYPES = True
-except ImportError:
-    HAS_CTYPES = False
-try:
-    from numpy import ufunc as NumpyUfuncType
-    from numpy import ndarray as NumpyArrayType
-    def ndarrayinstance(obj):
-        try:
-            if not isinstance(obj, NumpyArrayType): return False
-        except ReferenceError: return False # handle 'R3' weakref in 3.x
-        # verify that __reduce__ has not been overridden
-        NumpyInstance = NumpyArrayType((0,),'int8')
-        if id(obj.__reduce_ex__) == id(NumpyInstance.__reduce_ex__) and \
-           id(obj.__reduce__) == id(NumpyInstance.__reduce__): return True
-        return False
-except ImportError:
-    NumpyUfuncType = None
-    NumpyArrayType = None
-    def ndarrayinstance(obj): return False
-
-# make sure to add these 'hand-built' types to _typemap
-if PY3:
-    CellType = type((lambda x: lambda y: x)(0).__closure__[0])
-else:
-    CellType = type((lambda x: lambda y: x)(0).func_closure[0])
-WrapperDescriptorType = type(type.__repr__)
-MethodDescriptorType = type(type.__dict__['mro'])
-MethodWrapperType = type([].__repr__)
-PartialType = type(partial(int,base=2))
-SuperType = type(super(Exception, TypeError()))
-ItemGetterType = type(itemgetter(0))
-AttrGetterType = type(attrgetter('__repr__'))
-FileType = type(open(os.devnull, 'rb', buffering=0))
-TextWrapperType = type(open(os.devnull, 'r', buffering=-1))
-BufferedRandomType = type(open(os.devnull, 'r+b', buffering=-1))
-BufferedReaderType = type(open(os.devnull, 'rb', buffering=-1))
-BufferedWriterType = type(open(os.devnull, 'wb', buffering=-1))
-try:
-    from _pyio import open as _open
-    PyTextWrapperType = type(_open(os.devnull, 'r', buffering=-1))
-    PyBufferedRandomType = type(_open(os.devnull, 'r+b', buffering=-1))
-    PyBufferedReaderType = type(_open(os.devnull, 'rb', buffering=-1))
-    PyBufferedWriterType = type(_open(os.devnull, 'wb', buffering=-1))
-except ImportError:
-    PyTextWrapperType = PyBufferedRandomType = PyBufferedReaderType = PyBufferedWriterType = None
-try:
-    from cStringIO import StringIO, InputType, OutputType
-except ImportError:
-    if PY3:
-        from io import BytesIO as StringIO
-    else:
-        from StringIO import StringIO
-    InputType = OutputType = None
-try:
-    __IPYTHON__ is True # is ipython
-    ExitType = None     # IPython.core.autocall.ExitAutocall
-    singletontypes = ['exit', 'quit', 'get_ipython']
-except NameError:
-    try: ExitType = type(exit) # apparently 'exit' can be removed
-    except NameError: ExitType = None
-    singletontypes = []
-
-### File modes
-# Pickles the file handle, preserving mode. The position of the unpickled
-# object is as for a new file handle.
-HANDLE_FMODE = 0
-# Pickles the file contents, creating a new file if on load the file does
-# not exist. The position = min(pickled position, EOF) and mode is chosen
-# as such that "best" preserves behavior of the original file.
-CONTENTS_FMODE = 1
-# Pickles the entire file (handle and contents), preserving mode and position.
-FILE_FMODE = 2
-
-### Shorthands (modified from python2.5/lib/pickle.py)
-def copy(obj, *args, **kwds):
-    """use pickling to 'copy' an object"""
-    return loads(dumps(obj, *args, **kwds))
-
-def dump(obj, file, protocol=None, byref=False, fmode=HANDLE_FMODE):#, strictio=False):
-    """pickle an object to a file"""
-    strictio = False #FIXME: strict=True needs cleanup
-    if protocol is None: protocol = DEFAULT_PROTOCOL
-    pik = Pickler(file, protocol)
-    pik._main_module = _main_module
-    # save settings
-    _byref = pik._byref
-    _strictio = pik._strictio
-    _fmode = pik._fmode
-    # apply kwd settings
-    pik._byref = bool(byref)
-    pik._strictio = bool(strictio)
-    pik._fmode = fmode
-    # hack to catch subclassed numpy array instances
-    if NumpyArrayType and ndarrayinstance(obj):
-        @register(type(obj))
-        def save_numpy_array(pickler, obj):
-            log.info("Nu: (%s, %s)" % (obj.shape,obj.dtype))
-            npdict = getattr(obj, '__dict__', None)
-            f, args, state = obj.__reduce__()
-            pik.save_reduce(_create_array, (f, args, state, npdict), obj=obj)
-            return
-    # end hack
-    pik.dump(obj)
-    # return to saved settings
-    pik._byref = _byref
-    pik._strictio = _strictio
-    pik._fmode = _fmode
-    return
-
-def dumps(obj, protocol=None, byref=False, fmode=HANDLE_FMODE):#, strictio=False):
-    """pickle an object to a string"""
-    file = StringIO()
-    dump(obj, file, protocol, byref, fmode)#, strictio)
-    return file.getvalue()
-
-def load(file):
-    """unpickle an object from a file"""
-    pik = Unpickler(file)
-    pik._main_module = _main_module
-    obj = pik.load()
-    if type(obj).__module__ == _main_module.__name__: # point obj class to main
-        try: obj.__class__ == getattr(pik._main_module, type(obj).__name__)
-        except AttributeError: pass # defined in a file
-   #_main_module.__dict__.update(obj.__dict__) #XXX: should update globals ?
-    return obj
-
-def loads(str):
-    """unpickle an object from a string"""
-    file = StringIO(str)
-    return load(file)
-
-# def dumpzs(obj, protocol=None):
-#     """pickle an object to a compressed string"""
-#     return zlib.compress(dumps(obj, protocol))
-
-# def loadzs(str):
-#     """unpickle an object from a compressed string"""
-#     return loads(zlib.decompress(str))
-
-### End: Shorthands ###
-
-### Pickle the Interpreter Session
-def dump_session(filename='/tmp/session.pkl', main_module=_main_module):
-    """pickle the current state of __main__ to a file"""
-    f = open(filename, 'wb')
-    try:
-        pickler = Pickler(f, 2)
-        pickler._main_module = main_module
-        _byref = pickler._byref
-        pickler._byref = False  # disable pickling by name reference
-        pickler._session = True # is best indicator of when pickling a session
-        pickler.dump(main_module)
-        pickler._session = False
-        pickler._byref = _byref
-    finally:
-        f.close()
-    return
-
-def load_session(filename='/tmp/session.pkl', main_module=_main_module):
-    """update the __main__ module with the state from the session file"""
-    f = open(filename, 'rb')
-    try:
-        unpickler = Unpickler(f)
-        unpickler._main_module = main_module
-        unpickler._session = True
-        module = unpickler.load()
-        unpickler._session = False
-        main_module.__dict__.update(module.__dict__)
-    finally:
-        f.close()
-    return
-
-### End: Pickle the Interpreter
-
-### Extend the Picklers
-class Pickler(StockPickler):
-    """python's Pickler extended to interpreter sessions"""
-    dispatch = StockPickler.dispatch.copy()
-    _main_module = None
-    _session = False
-    _byref = False
-    _strictio = False
-    _fmode = HANDLE_FMODE
-    pass
-
-    def __init__(self, *args, **kwargs):
-        StockPickler.__init__(self, *args, **kwargs)
-        self._main_module = _main_module
-        self._diff_cache = {}
-
-class Unpickler(StockUnpickler):
-    """python's Unpickler extended to interpreter sessions and more types"""
-    _main_module = None
-    _session = False
-
-    def find_class(self, module, name):
-        if (module, name) == ('__builtin__', '__main__'):
-            return self._main_module.__dict__ #XXX: above set w/save_module_dict
-        return StockUnpickler.find_class(self, module, name)
-    pass
-
-    def __init__(self, *args, **kwargs):
-        StockUnpickler.__init__(self, *args, **kwargs)
-        self._main_module = _main_module
-
-'''
-def dispatch_table():
-    """get the dispatch table of registered types"""
-    return Pickler.dispatch
-'''
-
-pickle_dispatch_copy = StockPickler.dispatch.copy()
-
-def pickle(t, func):
-    """expose dispatch table for user-created extensions"""
-    Pickler.dispatch[t] = func
-    return
-
-def register(t):
-    def proxy(func):
-        Pickler.dispatch[t] = func
-        return func
-    return proxy
-
-def _revert_extension():
-    for type, func in list(StockPickler.dispatch.items()):
-        if func.__module__ == __name__:
-            del StockPickler.dispatch[type]
-            if type in pickle_dispatch_copy:
-                StockPickler.dispatch[type] = pickle_dispatch_copy[type]
-
-def use_diff(on=True):
-    """
-    reduces size of pickles by only including object which have changed.
-    Decreases pickle size but increases CPU time needed.
-    Also helps avoid some unpicklable objects.
-    MUST be called at start of script, otherwise changes will not be recorded.
-    """
-    global _use_diff, diff
-    _use_diff = on
-    if _use_diff and diff is None:
-        try:
-            from . import diff as d
-        except:
-            import diff as d
-        diff = d
-
-def _create_typemap():
-    import types
-    if PY3:
-        d = dict(list(__builtin__.__dict__.items()) + \
-                 list(types.__dict__.items())).items()
-        builtin = 'builtins'
-    else:
-        d = types.__dict__.iteritems()
-        builtin = '__builtin__'
-    for key, value in d:
-        if getattr(value, '__module__', None) == builtin \
-        and type(value) is type:
-            yield key, value
-    return
-_reverse_typemap = dict(_create_typemap())
-_reverse_typemap.update({
-    'CellType': CellType,
-    'WrapperDescriptorType': WrapperDescriptorType,
-    'MethodDescriptorType': MethodDescriptorType,
-    'MethodWrapperType': MethodWrapperType,
-    'PartialType': PartialType,
-    'SuperType': SuperType,
-    'ItemGetterType': ItemGetterType,
-    'AttrGetterType': AttrGetterType,
-    'FileType': FileType,
-    'BufferedRandomType': BufferedRandomType,
-    'BufferedReaderType': BufferedReaderType,
-    'BufferedWriterType': BufferedWriterType,
-    'TextWrapperType': TextWrapperType,
-    'PyBufferedRandomType': PyBufferedRandomType,
-    'PyBufferedReaderType': PyBufferedReaderType,
-    'PyBufferedWriterType': PyBufferedWriterType,
-    'PyTextWrapperType': PyTextWrapperType,
-})
-if ExitType:
-    _reverse_typemap['ExitType'] = ExitType
-if InputType:
-    _reverse_typemap['InputType'] = InputType
-    _reverse_typemap['OutputType'] = OutputType
-if PY3:
-    _typemap = dict((v, k) for k, v in _reverse_typemap.items())
-else:
-    _typemap = dict((v, k) for k, v in _reverse_typemap.iteritems())
-
-def _unmarshal(string):
-    return marshal.loads(string)
-
-def _load_type(name):
-    return _reverse_typemap[name]
-
-def _create_type(typeobj, *args):
-    return typeobj(*args)
-
-def _create_function(fcode, fglobals, fname=None, fdefaults=None, \
-                                      fclosure=None, fdict=None):
-    # same as FunctionType, but enable passing __dict__ to new function,
-    # __dict__ is the storehouse for attributes added after function creation
-    if fdict is None: fdict = dict()
-    func = FunctionType(fcode, fglobals, fname, fdefaults, fclosure)
-    func.__dict__.update(fdict) #XXX: better copy? option to copy?
-    return func
-
-def _create_ftype(ftypeobj, func, args, kwds):
-    if kwds is None:
-        kwds = {}
-    if args is None:
-        args = ()
-    return ftypeobj(func, *args, **kwds)
-
-def _create_lock(locked, *args):
-    from threading import Lock
-    lock = Lock()
-    if locked:
-        if not lock.acquire(False):
-            raise UnpicklingError("Cannot acquire lock")
-    return lock
-
-# thanks to matsjoyce for adding all the different file modes
-def _create_filehandle(name, mode, position, closed, open, strictio, fmode, fdata): # buffering=0
-    # only pickles the handle, not the file contents... good? or StringIO(data)?
-    # (for file contents see: http://effbot.org/librarybook/copy-reg.htm)
-    # NOTE: handle special cases first (are there more special cases?)
-    names = {'<stdin>':sys.__stdin__, '<stdout>':sys.__stdout__,
-             '<stderr>':sys.__stderr__} #XXX: better fileno=(0,1,2) ?
-    if name in list(names.keys()):
-        f = names[name] #XXX: safer "f=sys.stdin"
-    elif name == '<tmpfile>':
-        f = os.tmpfile()
-    elif name == '<fdopen>':
-        import tempfile
-        f = tempfile.TemporaryFile(mode)
-    else:
-        # treat x mode as w mode
-        if "x" in mode and sys.hexversion < 0x03030000:
-            raise ValueError("invalid mode: '%s'" % mode)
-
-        if not os.path.exists(name):
-            if strictio:
-                raise FileNotFoundError("[Errno 2] No such file or directory: '%s'" % name)
-            elif "r" in mode and fmode != FILE_FMODE:
-                name = '<fdopen>' # or os.devnull?
-            current_size = 0 # or maintain position?
-        else:
-            current_size = os.path.getsize(name)
-
-        if position > current_size:
-            if strictio:
-                raise ValueError("invalid buffer size")
-            elif fmode == CONTENTS_FMODE:
-                position = current_size
-        # try to open the file by name
-        # NOTE: has different fileno
-        try:
-            #FIXME: missing: *buffering*, encoding, softspace
-            if fmode == FILE_FMODE:
-                f = open(name, mode if "w" in mode else "w")
-                f.write(fdata)
-                if "w" not in mode:
-                    f.close()
-                    f = open(name, mode)
-            elif name == '<fdopen>': # file did not exist
-                import tempfile
-                f = tempfile.TemporaryFile(mode)
-            elif fmode == CONTENTS_FMODE \
-               and ("w" in mode or "x" in mode):
-                # stop truncation when opening
-                flags = os.O_CREAT
-                if "+" in mode:
-                    flags |= os.O_RDWR
-                else:
-                    flags |= os.O_WRONLY
-                f = os.fdopen(os.open(name, flags), mode)
-                # set name to the correct value
-                if PY3:
-                    r = getattr(f, "buffer", f)
-                    r = getattr(r, "raw", r)
-                    r.name = name
-                else:
-                    class FILE(ctypes.Structure):
-                        _fields_ = [("refcount", ctypes.c_long),
-                                    ("type_obj", ctypes.py_object),
-                                    ("file_pointer", ctypes.c_voidp),
-                                    ("name", ctypes.py_object)]
-
-                    class PyObject(ctypes.Structure):
-                        _fields_ = [
-                            ("ob_refcnt", ctypes.c_int),
-                            ("ob_type", ctypes.py_object)
-                            ]
-                    if not HAS_CTYPES:
-                        raise ImportError("No module named 'ctypes'")
-                    ctypes.cast(id(f), ctypes.POINTER(FILE)).contents.name = name
-                    ctypes.cast(id(name), ctypes.POINTER(PyObject)).contents.ob_refcnt += 1
-                assert f.name == name
-            else:
-                f = open(name, mode)
-        except (IOError, FileNotFoundError):
-            err = sys.exc_info()[1]
-            raise UnpicklingError(err)
-    if closed:
-        f.close()
-    elif position >= 0 and fmode != HANDLE_FMODE:
-        f.seek(position)
-    return f
-
-def _create_stringi(value, position, closed):
-    f = StringIO(value)
-    if closed: f.close()
-    else: f.seek(position)
-    return f
-
-def _create_stringo(value, position, closed):
-    f = StringIO()
-    if closed: f.close()
-    else:
-       f.write(value)
-       f.seek(position)
-    return f
-
-class _itemgetter_helper(object):
-    def __init__(self):
-        self.items = []
-    def __getitem__(self, item):
-        self.items.append(item)
-        return
-
-class _attrgetter_helper(object):
-    def __init__(self, attrs, index=None):
-        self.attrs = attrs
-        self.index = index
-    def __getattribute__(self, attr):
-        attrs = object.__getattribute__(self, "attrs")
-        index = object.__getattribute__(self, "index")
-        if index is None:
-            index = len(attrs)
-            attrs.append(attr)
-        else:
-            attrs[index] = ".".join([attrs[index], attr])
-        return type(self)(attrs, index)
-
-if HAS_CTYPES:
-    ctypes.pythonapi.PyCell_New.restype = ctypes.py_object
-    ctypes.pythonapi.PyCell_New.argtypes = [ctypes.py_object]
-    # thanks to Paul Kienzle for cleaning the ctypes CellType logic
-    def _create_cell(contents):
-        return ctypes.pythonapi.PyCell_New(contents)
-
-def _create_weakref(obj, *args):
-    from weakref import ref
-    if obj is None: # it's dead
-        if PY3:
-            from collections import UserDict
-        else:
-            from UserDict import UserDict
-        return ref(UserDict(), *args)
-    return ref(obj, *args)
-
-def _create_weakproxy(obj, callable=False, *args):
-    from weakref import proxy
-    if obj is None: # it's dead
-        if callable: return proxy(lambda x:x, *args)
-        if PY3:
-            from collections import UserDict
-        else:
-            from UserDict import UserDict
-        return proxy(UserDict(), *args)
-    return proxy(obj, *args)
-
-def _eval_repr(repr_str):
-    return eval(repr_str)
-
-def _create_array(f, args, state, npdict=None):
-   #array = numpy.core.multiarray._reconstruct(*args)
-    array = f(*args)
-    array.__setstate__(state)
-    if npdict is not None: # we also have saved state in __dict__
-        array.__dict__.update(npdict)
-    return array
-
-def _getattr(objclass, name, repr_str):
-    # hack to grab the reference directly
-    try: #XXX: works only for __builtin__ ?
-        attr = repr_str.split("'")[3]
-        return eval(attr+'.__dict__["'+name+'"]')
-    except:
-        attr = getattr(objclass,name)
-        if name == '__dict__':
-            attr = attr[name]
-        return attr
-
-def _get_attr(self, name):
-    # stop recursive pickling
-    return getattr(self, name)
-
-def _dict_from_dictproxy(dictproxy):
-    _dict = dictproxy.copy() # convert dictproxy to dict
-    _dict.pop('__dict__', None)
-    _dict.pop('__weakref__', None)
-    return _dict
-
-def _import_module(import_name, safe=False):
-    try:
-        if '.' in import_name:
-            items = import_name.split('.')
-            module = '.'.join(items[:-1])
-            obj = items[-1]
-        else:
-            return __import__(import_name)
-        return getattr(__import__(module, None, None, [obj]), obj)
-    except (ImportError, AttributeError):
-        if safe:
-            return None
-        raise
-
-def _locate_function(obj, session=False):
-    if obj.__module__ == '__main__': # and session:
-        return False
-    found = _import_module(obj.__module__ + '.' + obj.__name__, safe=True)
-    return found is obj
-
-@register(CodeType)
-def save_code(pickler, obj):
-    log.info("Co: %s" % obj)
-    pickler.save_reduce(_unmarshal, (marshal.dumps(obj),), obj=obj)
-    return
-
-@register(FunctionType)
-def save_function(pickler, obj):
-    if not _locate_function(obj): #, pickler._session):
-        log.info("F1: %s" % obj)
-        if PY3:
-            pickler.save_reduce(_create_function, (obj.__code__, 
-                                obj.__globals__, obj.__name__,
-                                obj.__defaults__, obj.__closure__,
-                                obj.__dict__), obj=obj)
-        else:
-            pickler.save_reduce(_create_function, (obj.func_code,
-                                obj.func_globals, obj.func_name,
-                                obj.func_defaults, obj.func_closure,
-                                obj.__dict__), obj=obj)
-    else:
-        log.info("F2: %s" % obj)
-        StockPickler.save_global(pickler, obj) #NOTE: also takes name=...
-    return
-
-@register(dict)
-def save_module_dict(pickler, obj):
-    if is_dill(pickler) and obj == pickler._main_module.__dict__ and not pickler._session:
-        log.info("D1: <dict%s" % str(obj.__repr__).split('dict')[-1]) # obj
-        if PY3:
-            pickler.write(bytes('c__builtin__\n__main__\n', 'UTF-8'))
-        else:
-            pickler.write('c__builtin__\n__main__\n')
-    elif not is_dill(pickler) and obj == _main_module.__dict__:
-        log.info("D3: <dict%s" % str(obj.__repr__).split('dict')[-1]) # obj
-        if PY3:
-            pickler.write(bytes('c__main__\n__dict__\n', 'UTF-8'))
-        else:
-            pickler.write('c__main__\n__dict__\n')   #XXX: works in general?
-    elif '__name__' in obj and obj != _main_module.__dict__ \
-    and obj is getattr(_import_module(obj['__name__'],True), '__dict__', None):
-        log.info("D4: <dict%s" % str(obj.__repr__).split('dict')[-1]) # obj
-        if PY3:
-            pickler.write(bytes('c%s\n__dict__\n' % obj['__name__'], 'UTF-8'))
-        else:
-            pickler.write('c%s\n__dict__\n' % obj['__name__'])
-    else:
-        log.info("D2: <dict%s" % str(obj.__repr__).split('dict')[-1]) # obj
-        if is_dill(pickler) and pickler._session:
-            # we only care about session the first pass thru
-            pickler._session = False 
-        StockPickler.save_dict(pickler, obj)
-    return
-
-@register(ClassType)
-def save_classobj(pickler, obj):
-    if obj.__module__ == '__main__': #XXX: use _main_module.__name__ everywhere?
-        log.info("C1: %s" % obj)
-        pickler.save_reduce(ClassType, (obj.__name__, obj.__bases__,
-                                        obj.__dict__), obj=obj)
-                                       #XXX: or obj.__dict__.copy()), obj=obj) ?
-    else:
-        log.info("C2: %s" % obj)
-        StockPickler.save_global(pickler, obj)
-    return
-
-@register(LockType)
-def save_lock(pickler, obj):
-    log.info("Lo: %s" % obj)
-    pickler.save_reduce(_create_lock, (obj.locked(),), obj=obj)
-    return
-
-@register(ItemGetterType)
-def save_itemgetter(pickler, obj):
-    log.info("Ig: %s" % obj)
-    helper = _itemgetter_helper()
-    obj(helper)
-    pickler.save_reduce(type(obj), tuple(helper.items), obj=obj)
-    return
-
-@register(AttrGetterType)
-def save_attrgetter(pickler, obj):
-    log.info("Ag: %s" % obj)
-    attrs = []
-    helper = _attrgetter_helper(attrs)
-    obj(helper)
-    pickler.save_reduce(type(obj), tuple(attrs), obj=obj)
-    return
-
-def _save_file(pickler, obj, open_):
-    obj.flush()
-    if obj.closed:
-        position = None
-    else:
-        if obj in (sys.__stdout__, sys.__stderr__, sys.__stdin__):
-            position = -1
-        else:
-            position = obj.tell()
-    if pickler._fmode == FILE_FMODE:
-        f = open_(obj.name, "r")
-        fdata = f.read()
-        f.close()
-    else:
-        fdata = ""
-    strictio = pickler._strictio
-    fmode = pickler._fmode
-    pickler.save_reduce(_create_filehandle, (obj.name, obj.mode, position,
-                                             obj.closed, open_, strictio,
-                                             fmode, fdata), obj=obj)
-    return
-
-
-@register(FileType) #XXX: in 3.x has buffer=0, needs different _create?
-@register(BufferedRandomType)
-@register(BufferedReaderType)
-@register(BufferedWriterType)
-@register(TextWrapperType)
-def save_file(pickler, obj):
-    log.info("Fi: %s" % obj)
-    return _save_file(pickler, obj, open)
-
-if PyTextWrapperType:
-    @register(PyBufferedRandomType)
-    @register(PyBufferedReaderType)
-    @register(PyBufferedWriterType)
-    @register(PyTextWrapperType)
-    def save_file(pickler, obj):
-        log.info("Fi: %s" % obj)
-        return _save_file(pickler, obj, _open)
-
-# The following two functions are based on 'saveCStringIoInput'
-# and 'saveCStringIoOutput' from spickle
-# Copyright (c) 2011 by science+computing ag
-# License: http://www.apache.org/licenses/LICENSE-2.0
-if InputType:
-    @register(InputType)
-    def save_stringi(pickler, obj):
-        log.info("Io: %s" % obj)
-        if obj.closed:
-            value = ''; position = None
-        else:
-            value = obj.getvalue(); position = obj.tell()
-        pickler.save_reduce(_create_stringi, (value, position, \
-                                              obj.closed), obj=obj)
-        return
-
-    @register(OutputType)
-    def save_stringo(pickler, obj):
-        log.info("Io: %s" % obj)
-        if obj.closed:
-            value = ''; position = None
-        else:
-            value = obj.getvalue(); position = obj.tell()
-        pickler.save_reduce(_create_stringo, (value, position, \
-                                              obj.closed), obj=obj)
-        return
-
-@register(PartialType)
-def save_functor(pickler, obj):
-    log.info("Fu: %s" % obj)
-    pickler.save_reduce(_create_ftype, (type(obj), obj.func, obj.args,
-                                        obj.keywords), obj=obj)
-    return
-
-@register(SuperType)
-def save_functor(pickler, obj):
-    log.info("Su: %s" % obj)
-    pickler.save_reduce(super, (obj.__thisclass__, obj.__self__), obj=obj)
-    return
-
-@register(BuiltinMethodType)
-def save_builtin_method(pickler, obj):
-    if obj.__self__ is not None:
-        log.info("B1: %s" % obj)
-        pickler.save_reduce(_get_attr, (obj.__self__, obj.__name__), obj=obj)
-    else:
-        log.info("B2: %s" % obj)
-        StockPickler.save_global(pickler, obj)
-    return
-
-@register(MethodType) #FIXME: fails for 'hidden' or 'name-mangled' classes
-def save_instancemethod0(pickler, obj):# example: cStringIO.StringI
-    log.info("Me: %s" % obj) #XXX: obj.__dict__ handled elsewhere?
-    if PY3:
-        pickler.save_reduce(MethodType, (obj.__func__, obj.__self__), obj=obj)
-    else:
-        pickler.save_reduce(MethodType, (obj.im_func, obj.im_self,
-                                         obj.im_class), obj=obj)
-    return
-
-if sys.hexversion >= 0x20500f0:
-    @register(MemberDescriptorType)
-    @register(GetSetDescriptorType)
-    @register(MethodDescriptorType)
-    @register(WrapperDescriptorType)
-    def save_wrapper_descriptor(pickler, obj):
-        log.info("Wr: %s" % obj)
-        pickler.save_reduce(_getattr, (obj.__objclass__, obj.__name__,
-                                       obj.__repr__()), obj=obj)
-        return
-
-    @register(MethodWrapperType)
-    def save_instancemethod(pickler, obj):
-        log.info("Mw: %s" % obj)
-        pickler.save_reduce(getattr, (obj.__self__, obj.__name__), obj=obj)
-        return
-else:
-    @register(MethodDescriptorType)
-    @register(WrapperDescriptorType)
-    def save_wrapper_descriptor(pickler, obj):
-        log.info("Wr: %s" % obj)
-        pickler.save_reduce(_getattr, (obj.__objclass__, obj.__name__,
-                                       obj.__repr__()), obj=obj)
-        return
-
-if HAS_CTYPES:
-    @register(CellType)
-    def save_cell(pickler, obj):
-        log.info("Ce: %s" % obj)
-        pickler.save_reduce(_create_cell, (obj.cell_contents,), obj=obj)
-        return
- 
-# The following function is based on 'saveDictProxy' from spickle
-# Copyright (c) 2011 by science+computing ag
-# License: http://www.apache.org/licenses/LICENSE-2.0
-@register(DictProxyType)
-def save_dictproxy(pickler, obj):
-    log.info("Dp: %s" % obj)
-    attr = obj.get('__dict__')
-   #pickler.save_reduce(_create_dictproxy, (attr,'nested'), obj=obj)
-    if type(attr) == GetSetDescriptorType and attr.__name__ == "__dict__" \
-    and getattr(attr.__objclass__, "__dict__", None) == obj:
-        pickler.save_reduce(getattr, (attr.__objclass__, "__dict__"), obj=obj)
-        return
-    # all bad below... so throw ReferenceError or TypeError
-    from weakref import ReferenceError
-    raise ReferenceError("%s does not reference a class __dict__" % obj)
-
-@register(SliceType)
-def save_slice(pickler, obj):
-    log.info("Sl: %s" % obj)
-    pickler.save_reduce(slice, (obj.start, obj.stop, obj.step), obj=obj)
-    return
-
-@register(XRangeType)
-@register(EllipsisType)
-@register(NotImplementedType)
-def save_singleton(pickler, obj):
-    log.info("Si: %s" % obj)
-    pickler.save_reduce(_eval_repr, (obj.__repr__(),), obj=obj)
-    return
-
-# thanks to Paul Kienzle for pointing out ufuncs didn't pickle
-if NumpyArrayType:
-    @register(NumpyUfuncType)
-    def save_numpy_ufunc(pickler, obj):
-        log.info("Nu: %s" % obj)
-        StockPickler.save_global(pickler, obj)
-        return
-# NOTE: the above 'save' performs like:
-#   import copy_reg
-#   def udump(f): return f.__name__
-#   def uload(name): return getattr(numpy, name)
-#   copy_reg.pickle(NumpyUfuncType, udump, uload)
-
-def _proxy_helper(obj): # a dead proxy returns a reference to None
-    """get memory address of proxy's reference object"""
-    try: #FIXME: has to be a smarter way to identify if it's a proxy
-        address = int(repr(obj).rstrip('>').split(' at ')[-1], base=16)
-    except ValueError: # has a repr... is thus probably not a proxy
-        address = id(obj)
-    return address
-
-def _locate_object(address, module=None):
-    """get object located at the given memory address (inverse of id(obj))"""
-    special = [None, True, False] #XXX: more...?
-    for obj in special:
-        if address == id(obj): return obj
-    if module:
-        if PY3:
-            objects = iter(module.__dict__.values())
-        else:
-            objects = module.__dict__.itervalues()
-    else: objects = iter(gc.get_objects())
-    for obj in objects:
-        if address == id(obj): return obj
-    # all bad below... nothing found so throw ReferenceError or TypeError
-    from weakref import ReferenceError
-    try: address = hex(address)
-    except TypeError:
-        raise TypeError("'%s' is not a valid memory address" % str(address))
-    raise ReferenceError("Cannot reference object at '%s'" % address)
-
-@register(ReferenceType)
-def save_weakref(pickler, obj):
-    refobj = obj()
-    log.info("R1: %s" % obj)
-   #refobj = ctypes.pythonapi.PyWeakref_GetObject(obj) # dead returns "None"
-    pickler.save_reduce(_create_weakref, (refobj,), obj=obj)
-    return
-
-@register(ProxyType)
-@register(CallableProxyType)
-def save_weakproxy(pickler, obj):
-    refobj = _locate_object(_proxy_helper(obj))
-    try: log.info("R2: %s" % obj)
-    except ReferenceError: log.info("R3: %s" % sys.exc_info()[1])
-   #callable = bool(getattr(refobj, '__call__', None))
-    if type(obj) is CallableProxyType: callable = True
-    else: callable = False
-    pickler.save_reduce(_create_weakproxy, (refobj, callable), obj=obj)
-    return
-
-@register(ModuleType)
-def save_module(pickler, obj):
-    if False: #_use_diff:
-        if obj.__name__ != "dill":
-            try:
-                changed = diff.whats_changed(obj, seen=pickler._diff_cache)[0]
-            except RuntimeError:  # not memorised module, probably part of dill
-                pass
-            else:
-                log.info("M1: %s with diff" % obj)
-                log.info("Diff: %s", changed.keys())
-                pickler.save_reduce(_import_module, (obj.__name__,), obj=obj,
-                                    state=changed)
-                return
-
-        log.info("M2: %s" % obj)
-        pickler.save_reduce(_import_module, (obj.__name__,), obj=obj)
-    else:
-        # if a module file name starts with prefx, it should be a builtin
-        # module, so should be pickled as a reference
-        prefix = getattr(sys, "base_prefix", sys.prefix)
-        std_mod = getattr(obj, "__file__", prefix).startswith(prefix)
-        if obj.__name__ not in ("builtins", "dill") \
-           and not std_mod or is_dill(pickler) and obj is pickler._main_module:
-            log.info("M1: %s" % obj)
-            _main_dict = obj.__dict__.copy() #XXX: better no copy? option to copy?
-            [_main_dict.pop(item, None) for item in singletontypes
-                + ["__builtins__", "__loader__"]]
-            pickler.save_reduce(_import_module, (obj.__name__,), obj=obj,
-                                state=_main_dict)
-        else:
-            log.info("M2: %s" % obj)
-            pickler.save_reduce(_import_module, (obj.__name__,), obj=obj)
-        return
-    return
-
-@register(TypeType)
-def save_type(pickler, obj):
-    if obj in _typemap:
-        log.info("T1: %s" % obj)
-        pickler.save_reduce(_load_type, (_typemap[obj],), obj=obj)
-    elif obj.__module__ == '__main__':
-        try: # use StockPickler for special cases [namedtuple,]
-            [getattr(obj, attr) for attr in ('_fields','_asdict',
-                                             '_make','_replace')]
-            log.info("T6: %s" % obj)
-            StockPickler.save_global(pickler, obj)
-            return
-        except AttributeError: pass
-        if type(obj) == type:
-        #   try: # used when pickling the class as code (or the interpreter)
-            if is_dill(pickler) and not pickler._byref:
-                # thanks to Tom Stepleton pointing out pickler._session unneeded
-                log.info("T2: %s" % obj)
-                _dict = _dict_from_dictproxy(obj.__dict__)
-        #   except: # punt to StockPickler (pickle by class reference)
-            else:
-                log.info("T5: %s" % obj)
-                StockPickler.save_global(pickler, obj)
-                return
-        else:
-            log.info("T3: %s" % obj)
-            _dict = obj.__dict__
-       #print (_dict)
-       #print ("%s\n%s" % (type(obj), obj.__name__))
-       #print ("%s\n%s" % (obj.__bases__, obj.__dict__))
-        pickler.save_reduce(_create_type, (type(obj), obj.__name__,
-                                           obj.__bases__, _dict), obj=obj)
-    else:
-        log.info("T4: %s" % obj)
-       #print (obj.__dict__)
-       #print ("%s\n%s" % (type(obj), obj.__name__))
-       #print ("%s\n%s" % (obj.__bases__, obj.__dict__))
-        StockPickler.save_global(pickler, obj)
-    return
-
-# quick sanity checking
-def pickles(obj,exact=False,safe=False,**kwds):
-    """quick check if object pickles with dill"""
-    if safe: exceptions = (Exception,) # RuntimeError, ValueError
-    else:
-        exceptions = (TypeError, AssertionError, PicklingError, UnpicklingError)
-    try:
-        pik = copy(obj, **kwds)
-        try:
-            result = bool(pik.all() == obj.all())
-        except AttributeError:
-            result = pik == obj
-        if result: return True
-        if not exact:
-            return type(pik) == type(obj)
-        return False
-    except exceptions:
-        return False
-
-# use to protect against missing attributes
-def is_dill(pickler):
-    "check the dill-ness of your pickler"
-    return 'dill' in pickler.__module__
-   #return hasattr(pickler,'_main_module')
-
-def _extend():
-    """extend pickle with all of dill's registered types"""
-    # need to have pickle not choke on _main_module?  use is_dill(pickler)
-    for t,func in Pickler.dispatch.items():
-        try:
-            StockPickler.dispatch[t] = func
-        except: #TypeError, PicklingError, UnpicklingError
-            log.info("skip: %s" % t)
-        else: pass
-    return
-
-del diff, _use_diff, use_diff
-
-# EOF

http://git-wip-us.apache.org/repos/asf/flink/blob/68b15593/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/objtypes.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/objtypes.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/objtypes.py
deleted file mode 100644
index bf0b557..0000000
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/objtypes.py
+++ /dev/null
@@ -1,27 +0,0 @@
-#!/usr/bin/env python
-#
-# Author: Mike McKerns (mmckerns @caltech and @uqfoundation)
-# Copyright (c) 2008-2014 California Institute of Technology.
-# License: 3-clause BSD.  The full license text is available at:
-#  - http://trac.mystic.cacr.caltech.edu/project/pathos/browser/dill/LICENSE
-"""
-all Python Standard Library object types (currently: CH 1-15 @ 2.7)
-and some other common object types (i.e. numpy.ndarray)
-
-to load more objects and types, use dill.load_types()
-"""
-
-from __future__ import absolute_import
-
-# non-local import of dill.objects
-from dill import objects
-for _type in objects.keys():
-    exec("%s = type(objects['%s'])" % (_type,_type))
-    
-del objects
-try:
-    del _type
-except NameError:
-    pass
-
-del absolute_import

http://git-wip-us.apache.org/repos/asf/flink/blob/68b15593/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/pointers.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/pointers.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/pointers.py
deleted file mode 100644
index 25714ea..0000000
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/pointers.py
+++ /dev/null
@@ -1,122 +0,0 @@
-#!/usr/bin/env python
-#
-# Author: Mike McKerns (mmckerns @caltech and @uqfoundation)
-# Copyright (c) 2008-2014 California Institute of Technology.
-# License: 3-clause BSD.  The full license text is available at:
-#  - http://trac.mystic.cacr.caltech.edu/project/pathos/browser/dill/LICENSE
-
-from __future__ import absolute_import
-__all__ = ['parent', 'reference', 'at', 'parents', 'children']
-
-import gc
-import sys
-
-from .dill import _proxy_helper as reference
-from .dill import _locate_object as at
-
-def parent(obj, objtype, ignore=()):
-    """
->>> listiter = iter([4,5,6,7])
->>> obj = parent(listiter, list)
->>> obj == [4,5,6,7]  # actually 'is', but don't have handle any longer
-True
-
-NOTE: objtype can be a single type (e.g. int or list) or a tuple of types.
-
-WARNING: if obj is a sequence (e.g. list), may produce unexpected results.
-Parent finds *one* parent (e.g. the last member of the sequence).
-    """
-    depth = 1 #XXX: always looking for the parent (only, right?)
-    chain = parents(obj, objtype, depth, ignore)
-    parent = chain.pop()
-    if parent is obj:
-        return None
-    return parent
-
-
-def parents(obj, objtype, depth=1, ignore=()): #XXX: objtype=object ?
-    """Find the chain of referents for obj. Chain will end with obj.
-
-    objtype: an object type or tuple of types to search for
-    depth: search depth (e.g. depth=2 is 'grandparents')
-    ignore: an object or tuple of objects to ignore in the search
-    """
-    edge_func = gc.get_referents # looking for refs, not back_refs
-    predicate = lambda x: isinstance(x, objtype) # looking for parent type
-   #if objtype is None: predicate = lambda x: True #XXX: in obj.mro() ?
-    ignore = (ignore,) if not hasattr(ignore, '__len__') else ignore
-    ignore = (id(obj) for obj in ignore)
-    chain = find_chain(obj, predicate, edge_func, depth)[::-1]
-    #XXX: should pop off obj... ?
-    return chain
-
-
-def children(obj, objtype, depth=1, ignore=()): #XXX: objtype=object ?
-    """Find the chain of referrers for obj. Chain will start with obj.
-
-    objtype: an object type or tuple of types to search for
-    depth: search depth (e.g. depth=2 is 'grandchildren')
-    ignore: an object or tuple of objects to ignore in the search
-
-    NOTE: a common thing to ignore is all globals, 'ignore=(globals(),)'
-
-    NOTE: repeated calls may yield different results, as python stores
-    the last value in the special variable '_'; thus, it is often good
-    to execute something to replace '_' (e.g. >>> 1+1).
-    """
-    edge_func = gc.get_referrers # looking for back_refs, not refs
-    predicate = lambda x: isinstance(x, objtype) # looking for child type
-   #if objtype is None: predicate = lambda x: True #XXX: in obj.mro() ?
-    ignore = (ignore,) if not hasattr(ignore, '__len__') else ignore
-    ignore = (id(obj) for obj in ignore)
-    chain = find_chain(obj, predicate, edge_func, depth, ignore)
-    #XXX: should pop off obj... ?
-    return chain
-
-
-# more generic helper function (cut-n-paste from objgraph)
-# Source at http://mg.pov.lt/objgraph/
-# Copyright (c) 2008-2010 Marius Gedminas <ma...@pov.lt>
-# Copyright (c) 2010 Stefano Rivera <st...@rivera.za.net>
-# Released under the MIT licence (see objgraph/objgrah.py)
-
-def find_chain(obj, predicate, edge_func, max_depth=20, extra_ignore=()):
-    queue = [obj]
-    depth = {id(obj): 0}
-    parent = {id(obj): None}
-    ignore = set(extra_ignore)
-    ignore.add(id(extra_ignore))
-    ignore.add(id(queue))
-    ignore.add(id(depth))
-    ignore.add(id(parent))
-    ignore.add(id(ignore))
-    ignore.add(id(sys._getframe()))  # this function
-    ignore.add(id(sys._getframe(1))) # find_chain/find_backref_chain, likely
-    gc.collect()
-    while queue:
-        target = queue.pop(0)
-        if predicate(target):
-            chain = [target]
-            while parent[id(target)] is not None:
-                target = parent[id(target)]
-                chain.append(target)
-            return chain
-        tdepth = depth[id(target)]
-        if tdepth < max_depth:
-            referrers = edge_func(target)
-            ignore.add(id(referrers))
-            for source in referrers:
-                if id(source) in ignore:
-                    continue
-                if id(source) not in depth:
-                    depth[id(source)] = tdepth + 1
-                    parent[id(source)] = target
-                    queue.append(source)
-    return [obj] # not found
-
-
-# backward compatability
-refobject = at
-
-
-# EOF