You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mx...@apache.org on 2015/07/30 19:11:18 UTC

[1/3] flink git commit: [FLINK-1927][FLINK-2173][py] Operator distribution rework, fix file paths

Repository: flink
Updated Branches:
  refs/heads/master 1919ae735 -> 68b155931


http://git-wip-us.apache.org/repos/asf/flink/blob/68b15593/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/source.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/source.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/source.py
deleted file mode 100644
index b55ca55..0000000
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/source.py
+++ /dev/null
@@ -1,1010 +0,0 @@
-# #!/usr/bin/env python
-#
-# Author: Mike McKerns (mmckerns @caltech and @uqfoundation)
-# Copyright (c) 2008-2014 California Institute of Technology.
-# License: 3-clause BSD.  The full license text is available at:
-#  - http://trac.mystic.cacr.caltech.edu/project/pathos/browser/dill/LICENSE
-#
-# inspired by inspect.py from Python-2.7.6
-# inspect.py author: 'Ka-Ping Yee <pi...@lfw.org>'
-# inspect.py merged into original dill.source by Mike McKerns 4/13/14
-"""
-Extensions to python's 'inspect' module, which can be used
-to retrieve information from live python objects. The methods
-defined in this module are augmented to facilitate access to 
-source code of interactively defined functions and classes,
-as well as provide access to source code for objects defined
-in a file.
-"""
-
-from __future__ import absolute_import
-__all__ = ['findsource', 'getsourcelines', 'getsource', 'indent', 'outdent', \
-           '_wrap', 'dumpsource', 'getname', '_namespace', 'getimport', \
-           '_importable', 'importable','isdynamic', 'isfrommain']
-
-import re
-import linecache
-from tokenize import TokenError
-from inspect import ismodule, isclass, ismethod, isfunction, istraceback
-from inspect import isframe, iscode, getfile, getmodule, getsourcefile
-from inspect import getblock, indentsize, isbuiltin
-from .dill import PY3
-
-def isfrommain(obj):
-    "check if object was built in __main__"
-    module = getmodule(obj)
-    if module and module.__name__ == '__main__':
-        return True
-    return False
-
-
-def isdynamic(obj):
-    "check if object was built in the interpreter"
-    try: file = getfile(obj)
-    except TypeError: file = None 
-    if file == '<stdin>' and isfrommain(obj):
-        return True
-    return False
-
-
-def _matchlambda(func, line):
-    """check if lambda object 'func' matches raw line of code 'line'"""
-    from dill.detect import code as getcode
-    from dill.detect import freevars, globalvars, varnames
-    dummy = lambda : '__this_is_a_big_dummy_function__'
-    # process the line (removing leading whitespace, etc)
-    lhs,rhs = line.split('lambda ',1)[-1].split(":", 1) #FIXME: if !1 inputs
-    try: #FIXME: unsafe
-        _ = eval("lambda %s : %s" % (lhs,rhs), globals(),locals())
-    except: _ = dummy
-    # get code objects, for comparison
-    _, code = getcode(_).co_code, getcode(func).co_code
-    # check if func is in closure
-    _f = [line.count(i) for i in freevars(func).keys()]
-    if not _f: # not in closure
-        # check if code matches
-        if _ == code: return True
-        return False
-    # weak check on freevars
-    if not all(_f): return False  #XXX: VERY WEAK
-    # weak check on varnames and globalvars
-    _f = varnames(func)
-    _f = [line.count(i) for i in _f[0]+_f[1]]
-    if _f and not all(_f): return False  #XXX: VERY WEAK
-    _f = [line.count(i) for i in globalvars(func).keys()]
-    if _f and not all(_f): return False  #XXX: VERY WEAK
-    # check if func is a double lambda
-    if (line.count('lambda ') > 1) and (lhs in freevars(func).keys()):
-        _lhs,_rhs = rhs.split('lambda ',1)[-1].split(":",1) #FIXME: if !1 inputs
-        try: #FIXME: unsafe
-            _f = eval("lambda %s : %s" % (_lhs,_rhs), globals(),locals())
-        except: _f = dummy
-        # get code objects, for comparison
-        _, code = getcode(_f).co_code, getcode(func).co_code
-        if len(_) != len(code): return False
-        #NOTE: should be same code same order, but except for 't' and '\x88'
-        _ = set((i,j) for (i,j) in zip(_,code) if i != j)
-        if len(_) != 1: return False #('t','\x88')
-        return True
-    # check indentsize
-    if not indentsize(line): return False #FIXME: is this a good check???
-    # check if code 'pattern' matches
-    #XXX: or pattern match against dis.dis(code)? (or use uncompyle2?)
-    _ = _.split(_[0])  # 't' #XXX: remove matching values if starts the same?
-    _f = code.split(code[0])  # '\x88'
-    #NOTE: should be same code different order, with different first element
-    _ = dict(re.match('([\W\D\S])(.*)', _[i]).groups() for i in range(1,len(_)))
-    _f = dict(re.match('([\W\D\S])(.*)', _f[i]).groups() for i in range(1,len(_f)))
-    if (_.keys() == _f.keys()) and (sorted(_.values()) == sorted(_f.values())):
-        return True
-    return False
-
-
-def findsource(object):
-    """Return the entire source file and starting line number for an object.
-    For interactively-defined objects, the 'file' is the interpreter's history.
-
-    The argument may be a module, class, method, function, traceback, frame,
-    or code object.  The source code is returned as a list of all the lines
-    in the file and the line number indexes a line in that list.  An IOError
-    is raised if the source code cannot be retrieved, while a TypeError is
-    raised for objects where the source code is unavailable (e.g. builtins)."""
-
-    module = getmodule(object)
-    try: file = getfile(module)
-    except TypeError: file = None 
-    # use readline when working in interpreter (i.e. __main__ and not file)
-    if module and module.__name__ == '__main__' and not file:
-        import readline
-        lbuf = readline.get_current_history_length()
-        lines = [readline.get_history_item(i)+'\n' for i in range(1,lbuf)]
-    else:
-        try: # special handling for class instances
-            if not isclass(object) and isclass(type(object)): # __class__
-                file = getfile(module)        
-                sourcefile = getsourcefile(module)
-            else: # builtins fail with a TypeError
-                file = getfile(object)
-                sourcefile = getsourcefile(object)
-        except (TypeError, AttributeError): # fail with better error
-            file = getfile(object)
-            sourcefile = getsourcefile(object)
-        if not sourcefile and file[:1] + file[-1:] != '<>':
-            raise IOError('source code not available')
-        file = sourcefile if sourcefile else file
-
-        module = getmodule(object, file)
-        if module:
-            lines = linecache.getlines(file, module.__dict__)
-        else:
-            lines = linecache.getlines(file)
-
-    if not lines:
-        raise IOError('could not get source code')
-
-    #FIXME: all below may fail if exec used (i.e. exec('f = lambda x:x') )
-    if ismodule(object):
-        return lines, 0
-
-    name = pat1 = obj = ''
-    pat2 = r'^(\s*@)'
-#   pat1b = r'^(\s*%s\W*=)' % name #FIXME: finds 'f = decorate(f)', not exec
-    if ismethod(object):
-        name = object.__name__
-        if name == '<lambda>': pat1 = r'(.*(?<!\w)lambda(:|\s))'
-        else: pat1 = r'^(\s*def\s)'
-        if PY3: object = object.__func__
-        else: object = object.im_func
-    if isfunction(object):
-        name = object.__name__
-        if name == '<lambda>':
-            pat1 = r'(.*(?<!\w)lambda(:|\s))'
-            obj = object #XXX: better a copy?
-        else: pat1 = r'^(\s*def\s)'
-        if PY3: object = object.__code__
-        else: object = object.func_code
-    if istraceback(object):
-        object = object.tb_frame
-    if isframe(object):
-        object = object.f_code
-    if iscode(object):
-        if not hasattr(object, 'co_firstlineno'):
-            raise IOError('could not find function definition')
-        stdin = object.co_filename == '<stdin>'
-        if stdin:
-            lnum = len(lines) - 1 # can't get lnum easily, so leverage pat
-            if not pat1: pat1 = r'^(\s*def\s)|(.*(?<!\w)lambda(:|\s))|^(\s*@)'
-        else:
-            lnum = object.co_firstlineno - 1
-            pat1 = r'^(\s*def\s)|(.*(?<!\w)lambda(:|\s))|^(\s*@)'
-        pat1 = re.compile(pat1); pat2 = re.compile(pat2)
-       #XXX: candidate_lnum = [n for n in range(lnum) if pat1.match(lines[n])]
-        while lnum > 0: #XXX: won't find decorators in <stdin> ?
-            line = lines[lnum]
-            if pat1.match(line):
-                if not stdin: break # co_firstlineno does the job
-                if name == '<lambda>': # hackery needed to confirm a match
-                    if _matchlambda(obj, line): break
-                else: # not a lambda, just look for the name
-                    if name in line: # need to check for decorator...
-                        hats = 0
-                        for _lnum in range(lnum-1,-1,-1):
-                            if pat2.match(lines[_lnum]): hats += 1
-                            else: break
-                        lnum = lnum - hats
-                        break
-            lnum = lnum - 1
-        return lines, lnum
-
-    try: # turn instances into classes
-        if not isclass(object) and isclass(type(object)): # __class__
-            object = object.__class__ #XXX: sometimes type(class) is better?
-            #XXX: we don't find how the instance was built
-    except AttributeError: pass
-    if isclass(object):
-        name = object.__name__
-        pat = re.compile(r'^(\s*)class\s*' + name + r'\b')
-        # make some effort to find the best matching class definition:
-        # use the one with the least indentation, which is the one
-        # that's most probably not inside a function definition.
-        candidates = []
-        for i in range(len(lines)-1,-1,-1):
-            match = pat.match(lines[i])
-            if match:
-                # if it's at toplevel, it's already the best one
-                if lines[i][0] == 'c':
-                    return lines, i
-                # else add whitespace to candidate list
-                candidates.append((match.group(1), i))
-        if candidates:
-            # this will sort by whitespace, and by line number,
-            # less whitespace first  #XXX: should sort high lnum before low
-            candidates.sort()
-            return lines, candidates[0][1]
-        else:
-            raise IOError('could not find class definition')
-    raise IOError('could not find code object')
-
-
-def getblocks(object, lstrip=False, enclosing=False, locate=False):
-    """Return a list of source lines and starting line number for an object.
-    Interactively-defined objects refer to lines in the interpreter's history.
-
-    If enclosing=True, then also return any enclosing code.
-    If lstrip=True, ensure there is no indentation in the first line of code.
-    If locate=True, then also return the line number for the block of code.
-
-    DEPRECATED: use 'getsourcelines' instead
-    """
-    lines, lnum = findsource(object)
-
-    if ismodule(object):
-        if lstrip: lines = _outdent(lines)
-        return ([lines], [0]) if locate is True else [lines]
-
-    #XXX: 'enclosing' means: closures only? or classes and files?
-    indent = indentsize(lines[lnum])
-    block = getblock(lines[lnum:]) #XXX: catch any TokenError here?
-
-    if not enclosing or not indent:
-        if lstrip: block = _outdent(block)
-        return ([block], [lnum]) if locate is True else [block]
-
-    pat1 = r'^(\s*def\s)|(.*(?<!\w)lambda(:|\s))'; pat1 = re.compile(pat1)
-    pat2 = r'^(\s*@)'; pat2 = re.compile(pat2)
-   #pat3 = r'^(\s*class\s)'; pat3 = re.compile(pat3) #XXX: enclosing class?
-    #FIXME: bound methods need enclosing class (and then instantiation)
-    #       *or* somehow apply a partial using the instance
-
-    skip = 0
-    line = 0
-    blocks = []; _lnum = []
-    target = ''.join(block)
-    while line <= lnum: #XXX: repeat lnum? or until line < lnum?
-        # see if starts with ('def','lambda') and contains our target block
-        if pat1.match(lines[line]):
-            if not skip:
-                try: code = getblock(lines[line:])
-                except TokenError: code = [lines[line]]
-            if indentsize(lines[line]) > indent: #XXX: should be >= ?
-                line += len(code) - skip
-            elif target in ''.join(code):
-                blocks.append(code) # save code block as the potential winner
-                _lnum.append(line - skip) # save the line number for the match
-                line += len(code) - skip
-            else:
-                line += 1
-            skip = 0
-        # find skip: the number of consecutive decorators
-        elif pat2.match(lines[line]):
-            try: code = getblock(lines[line:])
-            except TokenError: code = [lines[line]]
-            skip = 1
-            for _line in code[1:]: # skip lines that are decorators
-                if not pat2.match(_line): break
-                skip += 1
-            line += skip
-        # no match: reset skip and go to the next line
-        else:
-            line +=1
-            skip = 0
-
-    if not blocks:
-        blocks = [block]
-        _lnum = [lnum]
-    if lstrip: blocks = [_outdent(block) for block in blocks]
-    # return last match
-    return (blocks, _lnum) if locate is True else blocks
-
-
-def getsourcelines(object, lstrip=False, enclosing=False):
-    """Return a list of source lines and starting line number for an object.
-    Interactively-defined objects refer to lines in the interpreter's history.
-
-    The argument may be a module, class, method, function, traceback, frame,
-    or code object.  The source code is returned as a list of the lines
-    corresponding to the object and the line number indicates where in the
-    original source file the first line of code was found.  An IOError is
-    raised if the source code cannot be retrieved, while a TypeError is
-    raised for objects where the source code is unavailable (e.g. builtins).
-
-    If lstrip=True, ensure there is no indentation in the first line of code.
-    If enclosing=True, then also return any enclosing code."""
-    code, n = getblocks(object, lstrip=lstrip, enclosing=enclosing, locate=True)
-    return code[-1], n[-1]
-
-
-#NOTE: broke backward compatibility 4/16/14 (was lstrip=True, force=True)
-def getsource(object, alias='', lstrip=False, enclosing=False, \
-                                              force=False, builtin=False):
-    """Return the text of the source code for an object. The source code for
-    interactively-defined objects are extracted from the interpreter's history.
-
-    The argument may be a module, class, method, function, traceback, frame,
-    or code object.  The source code is returned as a single string.  An
-    IOError is raised if the source code cannot be retrieved, while a
-    TypeError is raised for objects where the source code is unavailable
-    (e.g. builtins).
-
-    If alias is provided, then add a line of code that renames the object.
-    If lstrip=True, ensure there is no indentation in the first line of code.
-    If enclosing=True, then also return any enclosing code.
-    If force=True, catch (TypeError,IOError) and try to use import hooks.
-    If builtin=True, force an import for any builtins
-    """
-    # hascode denotes a callable
-    hascode = _hascode(object)
-    # is a class instance type (and not in builtins)
-    instance = _isinstance(object)
-
-    # get source lines; if fail, try to 'force' an import
-    try: # fails for builtins, and other assorted object types
-        lines, lnum = getsourcelines(object, enclosing=enclosing)
-    except (TypeError, IOError): # failed to get source, resort to import hooks
-        if not force: # don't try to get types that findsource can't get
-            raise
-        if not getmodule(object): # get things like 'None' and '1'
-            if not instance: return getimport(object, alias, builtin=builtin)
-            # special handling (numpy arrays, ...)
-            _import = getimport(object, builtin=builtin)
-            name = getname(object, force=True)
-            _alias = "%s = " % alias if alias else ""
-            if alias == name: _alias = ""
-            return _import+_alias+"%s\n" % name
-        else: #FIXME: could use a good bit of cleanup, since using getimport...
-            if not instance: return getimport(object, alias, builtin=builtin)
-            # now we are dealing with an instance...
-            name = object.__class__.__name__
-            module = object.__module__
-            if module in ['builtins','__builtin__']:
-                return getimport(object, alias, builtin=builtin)
-            else: #FIXME: leverage getimport? use 'from module import name'?
-                lines, lnum = ["%s = __import__('%s', fromlist=['%s']).%s\n" % (name,module,name,name)], 0
-                obj = eval(lines[0].lstrip(name + ' = '))
-                lines, lnum = getsourcelines(obj, enclosing=enclosing)
-
-    # strip leading indent (helps ensure can be imported)
-    if lstrip or alias:
-        lines = _outdent(lines)
-
-    # instantiate, if there's a nice repr  #XXX: BAD IDEA???
-    if instance: #and force: #XXX: move into findsource or getsourcelines ?
-        if '(' in repr(object): lines.append('%r\n' % object)
-       #else: #XXX: better to somehow to leverage __reduce__ ?
-       #    reconstructor,args = object.__reduce__()
-       #    _ = reconstructor(*args)
-        else: # fall back to serialization #XXX: bad idea?
-            #XXX: better not duplicate work? #XXX: better new/enclose=True?
-            lines = dumpsource(object, alias='', new=force, enclose=False)
-            lines, lnum = [line+'\n' for line in lines.split('\n')][:-1], 0
-       #else: object.__code__ # raise AttributeError
-
-    # add an alias to the source code
-    if alias:
-        if hascode:
-            skip = 0
-            for line in lines: # skip lines that are decorators
-                if not line.startswith('@'): break
-                skip += 1
-            #XXX: use regex from findsource / getsourcelines ?
-            if lines[skip].lstrip().startswith('def '): # we have a function
-                if alias != object.__name__:
-                    lines.append('\n%s = %s\n' % (alias, object.__name__))
-            elif 'lambda ' in lines[skip]: # we have a lambda
-                if alias != lines[skip].split('=')[0].strip():
-                    lines[skip] = '%s = %s' % (alias, lines[skip])
-            else: # ...try to use the object's name
-                if alias != object.__name__:
-                    lines.append('\n%s = %s\n' % (alias, object.__name__))
-        else: # class or class instance
-            if instance:
-                if alias != lines[-1].split('=')[0].strip():
-                    lines[-1] = ('%s = ' % alias) + lines[-1]
-            else:
-                name = getname(object, force=True) or object.__name__
-                if alias != name:
-                    lines.append('\n%s = %s\n' % (alias, name))
-    return ''.join(lines)
-
-
-def _hascode(object):
-    '''True if object has an attribute that stores it's __code__'''
-    return getattr(object,'__code__',None) or getattr(object,'func_code',None)
-
-def _isinstance(object):
-    '''True if object is a class instance type (and is not a builtin)'''
-    if _hascode(object) or isclass(object) or ismodule(object):
-        return False
-    if istraceback(object) or isframe(object) or iscode(object):
-        return False
-    # special handling (numpy arrays, ...)
-    if not getmodule(object) and getmodule(type(object)).__name__ in ['numpy']:
-        return True
-#   # check if is instance of a builtin
-#   if not getmodule(object) and getmodule(type(object)).__name__ in ['__builtin__','builtins']:
-#       return False
-    _types = ('<class ',"<type 'instance'>")
-    if not repr(type(object)).startswith(_types): #FIXME: weak hack
-        return False
-    if not getmodule(object) or object.__module__ in ['builtins','__builtin__'] or getname(object, force=True) in ['array']:
-        return False
-    return True # by process of elimination... it's what we want
-
-
-def _intypes(object):
-    '''check if object is in the 'types' module'''
-    import types
-    # allow user to pass in object or object.__name__
-    if type(object) is not type(''):
-        object = getname(object, force=True)
-    if object == 'ellipsis': object = 'EllipsisType'
-    return True if hasattr(types, object) else False
-
-
-def _isstring(object): #XXX: isstringlike better?
-    '''check if object is a string-like type'''
-    if PY3: return isinstance(object, (str, bytes))
-    return isinstance(object, basestring)
-
-
-def indent(code, spaces=4):
-    '''indent a block of code with whitespace (default is 4 spaces)'''
-    indent = indentsize(code) 
-    if type(spaces) is int: spaces = ' '*spaces
-    # if '\t' is provided, will indent with a tab
-    nspaces = indentsize(spaces)
-    # blank lines (etc) need to be ignored
-    lines = code.split('\n')
-##  stq = "'''"; dtq = '"""'
-##  in_stq = in_dtq = False
-    for i in range(len(lines)):
-        #FIXME: works... but shouldn't indent 2nd+ lines of multiline doc
-        _indent = indentsize(lines[i])
-        if indent > _indent: continue
-        lines[i] = spaces+lines[i]
-##      #FIXME: may fail when stq and dtq in same line (depends on ordering)
-##      nstq, ndtq = lines[i].count(stq), lines[i].count(dtq)
-##      if not in_dtq and not in_stq:
-##          lines[i] = spaces+lines[i] # we indent
-##          # entering a comment block
-##          if nstq%2: in_stq = not in_stq
-##          if ndtq%2: in_dtq = not in_dtq
-##      # leaving a comment block
-##      elif in_dtq and ndtq%2: in_dtq = not in_dtq
-##      elif in_stq and nstq%2: in_stq = not in_stq
-##      else: pass
-    if lines[-1].strip() == '': lines[-1] = ''
-    return '\n'.join(lines)
-
-
-def _outdent(lines, spaces=None, all=True):
-    '''outdent lines of code, accounting for docs and line continuations'''
-    indent = indentsize(lines[0]) 
-    if spaces is None or spaces > indent or spaces < 0: spaces = indent
-    for i in range(len(lines) if all else 1):
-        #FIXME: works... but shouldn't outdent 2nd+ lines of multiline doc
-        _indent = indentsize(lines[i])
-        if spaces > _indent: _spaces = _indent
-        else: _spaces = spaces
-        lines[i] = lines[i][_spaces:]
-    return lines
-
-def outdent(code, spaces=None, all=True):
-    '''outdent a block of code (default is to strip all leading whitespace)'''
-    indent = indentsize(code) 
-    if spaces is None or spaces > indent or spaces < 0: spaces = indent
-    #XXX: will this delete '\n' in some cases?
-    if not all: return code[spaces:]
-    return '\n'.join(_outdent(code.split('\n'), spaces=spaces, all=all))
-
-
-#XXX: not sure what the point of _wrap is...
-#exec_ = lambda s, *a: eval(compile(s, '<string>', 'exec'), *a)
-__globals__ = globals()
-__locals__ = locals()
-wrap2 = '''
-def _wrap(f):
-    """ encapsulate a function and it's __import__ """
-    def func(*args, **kwds):
-        try:
-            #_ = eval(getsource(f, force=True)) #FIXME: safer, but not as robust
-            exec getimportable(f, alias='_') in %s, %s
-        except:
-            raise ImportError('cannot import name ' + f.__name__)
-        return _(*args, **kwds)
-    func.__name__ = f.__name__
-    func.__doc__ = f.__doc__
-    return func
-''' % ('__globals__', '__locals__')
-wrap3 = '''
-def _wrap(f):
-    """ encapsulate a function and it's __import__ """
-    def func(*args, **kwds):
-        try:
-            #_ = eval(getsource(f, force=True)) #FIXME: safer, but not as robust
-            exec(getimportable(f, alias='_'), %s, %s)
-        except:
-            raise ImportError('cannot import name ' + f.__name__)
-        return _(*args, **kwds)
-    func.__name__ = f.__name__
-    func.__doc__ = f.__doc__
-    return func
-''' % ('__globals__', '__locals__')
-if PY3:
-    exec(wrap3)
-else:
-    exec(wrap2)
-del wrap2, wrap3
-
-
-def _enclose(object, alias=''): #FIXME: needs alias to hold returned object
-    """create a function enclosure around the source of some object"""
-    #XXX: dummy and stub should append a random string
-    dummy = '__this_is_a_big_dummy_enclosing_function__'
-    stub = '__this_is_a_stub_variable__'
-    code = 'def %s():\n' % dummy
-    code += indent(getsource(object, alias=stub, lstrip=True, force=True))
-    code += indent('return %s\n' % stub)
-    if alias: code += '%s = ' % alias
-    code += '%s(); del %s\n' % (dummy, dummy)
-   #code += "globals().pop('%s',lambda :None)()\n" % dummy
-    return code
-
-
-def dumpsource(object, alias='', new=False, enclose=True):
-    """'dump to source', where the code includes a pickled object.
-
-    If new=True and object is a class instance, then create a new
-    instance using the unpacked class source code. If enclose, then
-    create the object inside a function enclosure (thus minimizing
-    any global namespace pollution).
-    """
-    from dill import dumps
-    pik = repr(dumps(object))
-    code = 'import dill\n'
-    if enclose:
-        stub = '__this_is_a_stub_variable__' #XXX: *must* be same _enclose.stub
-        pre = '%s = ' % stub
-        new = False #FIXME: new=True doesn't work with enclose=True
-    else:
-        stub = alias
-        pre = '%s = ' % stub if alias else alias
-    
-    # if a 'new' instance is not needed, then just dump and load
-    if not new or not _isinstance(object):
-        code += pre + 'dill.loads(%s)\n' % pik
-    else: #XXX: other cases where source code is needed???
-        code += getsource(object.__class__, alias='', lstrip=True, force=True)
-        mod = repr(object.__module__) # should have a module (no builtins here)
-        if PY3:
-            code += pre + 'dill.loads(%s.replace(b%s,bytes(__name__,"UTF-8")))\n' % (pik,mod)
-        else:
-            code += pre + 'dill.loads(%s.replace(%s,__name__))\n' % (pik,mod)
-       #code += 'del %s' % object.__class__.__name__ #NOTE: kills any existing!
-
-    if enclose:
-        # generation of the 'enclosure'
-        dummy = '__this_is_a_big_dummy_object__'
-        dummy = _enclose(dummy, alias=alias)
-        # hack to replace the 'dummy' with the 'real' code
-        dummy = dummy.split('\n')
-        code = dummy[0]+'\n' + indent(code) + '\n'.join(dummy[-3:])
-
-    return code #XXX: better 'dumpsourcelines', returning list of lines?
-
-
-def getname(obj, force=False): #XXX: allow 'throw'(?) to raise error on fail?
-    """get the name of the object. for lambdas, get the name of the pointer """
-    module = getmodule(obj)
-    if not module: # things like "None" and "1"
-        if not force: return None
-        return repr(obj)
-    try:
-        #XXX: 'wrong' for decorators and curried functions ?
-        #       if obj.func_closure: ...use logic from getimportable, etc ?
-        name = obj.__name__
-        if name == '<lambda>':
-            return getsource(obj).split('=',1)[0].strip()
-        # handle some special cases
-        if module.__name__ in ['builtins','__builtin__']:
-            if name == 'ellipsis': name = 'EllipsisType'
-        return name
-    except AttributeError: #XXX: better to just throw AttributeError ?
-        if not force: return None
-        name = repr(obj)
-        if name.startswith('<'): # or name.split('('):
-            return None
-        return name
-
-
-def _namespace(obj):
-    """_namespace(obj); return namespace hierarchy (as a list of names)
-    for the given object.  For an instance, find the class hierarchy.
-
-    For example:
-
-    >>> from functools import partial
-    >>> p = partial(int, base=2)
-    >>> _namespace(p)
-    [\'functools\', \'partial\']
-    """
-    # mostly for functions and modules and such
-    #FIXME: 'wrong' for decorators and curried functions
-    try: #XXX: needs some work and testing on different types
-        module = qual = str(getmodule(obj)).split()[1].strip('"').strip("'")
-        qual = qual.split('.')
-        if ismodule(obj):
-            return qual
-        # get name of a lambda, function, etc
-        name = getname(obj) or obj.__name__ # failing, raise AttributeError
-        # check special cases (NoneType, ...)
-        if module in ['builtins','__builtin__']: # BuiltinFunctionType
-            if _intypes(name): return ['types'] + [name]
-        return qual + [name] #XXX: can be wrong for some aliased objects
-    except: pass
-    # special case: numpy.inf and numpy.nan (we don't want them as floats)
-    if str(obj) in ['inf','nan','Inf','NaN']: # is more, but are they needed?
-        return ['numpy'] + [str(obj)]
-    # mostly for classes and class instances and such
-    module = getattr(obj.__class__, '__module__', None)
-    qual = str(obj.__class__)
-    try: qual = qual[qual.index("'")+1:-2]
-    except ValueError: pass # str(obj.__class__) made the 'try' unnecessary
-    qual = qual.split(".")
-    if module in ['builtins','__builtin__']:
-        # check special cases (NoneType, Ellipsis, ...)
-        if qual[-1] == 'ellipsis': qual[-1] = 'EllipsisType'
-        if _intypes(qual[-1]): module = 'types' #XXX: BuiltinFunctionType
-        qual = [module] + qual
-    return qual
-
-
-#NOTE: 05/25/14 broke backward compatability: added 'alias' as 3rd argument
-def _getimport(head, tail, alias='', verify=True, builtin=False):
-    """helper to build a likely import string from head and tail of namespace.
-    ('head','tail') are used in the following context: "from head import tail"
-
-    If verify=True, then test the import string before returning it.
-    If builtin=True, then force an import for builtins where possible.
-    If alias is provided, then rename the object on import.
-    """
-    # special handling for a few common types
-    if tail in ['Ellipsis', 'NotImplemented'] and head in ['types']:
-        head = len.__module__
-    elif tail in ['None'] and head in ['types']:
-        _alias = '%s = ' % alias if alias else ''
-        if alias == tail: _alias = ''
-        return _alias+'%s\n' % tail
-    # we don't need to import from builtins, so return ''
-#   elif tail in ['NoneType','int','float','long','complex']: return '' #XXX: ?
-    if head in ['builtins','__builtin__']:
-        # special cases (NoneType, Ellipsis, ...) #XXX: BuiltinFunctionType
-        if tail == 'ellipsis': tail = 'EllipsisType'
-        if _intypes(tail): head = 'types'
-        elif not builtin:
-            _alias = '%s = ' % alias if alias else ''
-            if alias == tail: _alias = ''
-            return _alias+'%s\n' % tail
-        else: pass # handle builtins below
-    # get likely import string
-    if not head: _str = "import %s" % tail
-    else: _str = "from %s import %s" % (head, tail)
-    _alias = " as %s\n" % alias if alias else "\n"
-    if alias == tail: _alias = "\n"
-    _str += _alias
-    # FIXME: fails on most decorators, currying, and such...
-    #        (could look for magic __wrapped__ or __func__ attr)
-    #        (could fix in 'namespace' to check obj for closure)
-    if verify and not head.startswith('dill.'):# weird behavior for dill
-       #print(_str)
-        try: exec(_str) #XXX: check if == obj? (name collision)
-        except ImportError: #XXX: better top-down or bottom-up recursion?
-            _head = head.rsplit(".",1)[0] #(or get all, then compare == obj?)
-            if not _head: raise
-            if _head != head:
-                _str = _getimport(_head, tail, alias, verify)
-    return _str
-
-
-#XXX: rename builtin to force? vice versa? verify to force? (as in getsource)
-#NOTE: 05/25/14 broke backward compatability: added 'alias' as 2nd argument
-def getimport(obj, alias='', verify=True, builtin=False, enclosing=False):
-    """get the likely import string for the given object
-
-    obj is the object to inspect
-    If verify=True, then test the import string before returning it.
-    If builtin=True, then force an import for builtins where possible.
-    If enclosing=True, get the import for the outermost enclosing callable.
-    If alias is provided, then rename the object on import.
-    """
-    if enclosing:
-        from dill.detect import outermost
-        _obj = outermost(obj)
-        obj = _obj if _obj else obj
-    # get the namespace
-    qual = _namespace(obj)
-    head = '.'.join(qual[:-1])
-    tail = qual[-1]
-    # for named things... with a nice repr #XXX: move into _namespace?
-    try: # look for '<...>' and be mindful it might be in lists, dicts, etc...
-        name = repr(obj).split('<',1)[1].split('>',1)[1]
-        name = None # we have a 'object'-style repr
-    except: # it's probably something 'importable'
-        if head in ['builtins','__builtin__']:
-            name = repr(obj) #XXX: catch [1,2], (1,2), set([1,2])... others?
-        else:
-            name = repr(obj).split('(')[0]
-   #if not repr(obj).startswith('<'): name = repr(obj).split('(')[0]
-   #else: name = None
-    if name: # try using name instead of tail
-        try: return _getimport(head, name, alias, verify, builtin)
-        except ImportError: pass
-        except SyntaxError:
-            if head in ['builtins','__builtin__']:
-                _alias = '%s = ' % alias if alias else ''
-                if alias == name: _alias = ''
-                return _alias+'%s\n' % name
-            else: pass
-    try:
-       #if type(obj) is type(abs): _builtin = builtin # BuiltinFunctionType
-       #else: _builtin = False
-        return _getimport(head, tail, alias, verify, builtin)
-    except ImportError:
-        raise # could do some checking against obj
-    except SyntaxError:
-        if head in ['builtins','__builtin__']:
-            _alias = '%s = ' % alias if alias else ''
-            if alias == tail: _alias = ''
-            return _alias+'%s\n' % tail
-        raise # could do some checking against obj
-
-
-def _importable(obj, alias='', source=None, enclosing=False, force=True, \
-                                              builtin=True, lstrip=True):
-    """get an import string (or the source code) for the given object
-
-    This function will attempt to discover the name of the object, or the repr
-    of the object, or the source code for the object. To attempt to force
-    discovery of the source code, use source=True, to attempt to force the
-    use of an import, use source=False; otherwise an import will be sought
-    for objects not defined in __main__. The intent is to build a string
-    that can be imported from a python file. obj is the object to inspect.
-    If alias is provided, then rename the object with the given alias.
-
-    If source=True, use these options:
-      If enclosing=True, then also return any enclosing code.
-      If force=True, catch (TypeError,IOError) and try to use import hooks.
-      If lstrip=True, ensure there is no indentation in the first line of code.
-
-    If source=False, use these options:
-      If enclosing=True, get the import for the outermost enclosing callable.
-      If force=True, then don't test the import string before returning it.
-      If builtin=True, then force an import for builtins where possible.
-    """
-    if source is None:
-        source = True if isfrommain(obj) else False
-    if source: # first try to get the source
-        try:
-            return getsource(obj, alias, enclosing=enclosing, \
-                             force=force, lstrip=lstrip, builtin=builtin)
-        except: pass
-    try:
-        if not _isinstance(obj):
-            return getimport(obj, alias, enclosing=enclosing, \
-                                  verify=(not force), builtin=builtin)
-        # first 'get the import', then 'get the instance'
-        _import = getimport(obj, enclosing=enclosing, \
-                                 verify=(not force), builtin=builtin)
-        name = getname(obj, force=True)
-        if not name:
-            raise AttributeError("object has no atribute '__name__'")
-        _alias = "%s = " % alias if alias else ""
-        if alias == name: _alias = ""
-        return _import+_alias+"%s\n" % name
-
-    except: pass
-    if not source: # try getsource, only if it hasn't been tried yet
-        try:
-            return getsource(obj, alias, enclosing=enclosing, \
-                             force=force, lstrip=lstrip, builtin=builtin)
-        except: pass
-    # get the name (of functions, lambdas, and classes)
-    # or hope that obj can be built from the __repr__
-    #XXX: what to do about class instances and such?
-    obj = getname(obj, force=force)
-    # we either have __repr__ or __name__ (or None)
-    if not obj or obj.startswith('<'):
-        raise AttributeError("object has no atribute '__name__'")
-    _alias = '%s = ' % alias if alias else ''
-    if alias == obj: _alias = ''
-    return _alias+'%s\n' % obj
-    #XXX: possible failsafe... (for example, for instances when source=False)
-    #     "import dill; result = dill.loads(<pickled_object>); # repr(<object>)"
-
-def _closuredimport(func, alias='', builtin=False):
-    """get import for closured objects; return a dict of 'name' and 'import'"""
-    import re
-    from dill.detect import freevars, outermost
-    free_vars = freevars(func)
-    func_vars = {}
-    # split into 'funcs' and 'non-funcs'
-    for name,obj in list(free_vars.items()):
-        if not isfunction(obj): continue
-        # get import for 'funcs'
-        fobj = free_vars.pop(name)
-        src = getsource(fobj)
-        if src.lstrip().startswith('@'): # we have a decorator
-            src = getimport(fobj, alias=alias, builtin=builtin)
-        else: # we have to "hack" a bit... and maybe be lucky
-            encl = outermost(func)
-            # pattern: 'func = enclosing(fobj'
-            pat = '.*[\w\s]=\s*'+getname(encl)+'\('+getname(fobj)
-            mod = getname(getmodule(encl))
-            #HACK: get file containing 'outer' function; is func there?
-            lines,_ = findsource(encl)
-            candidate = [line for line in lines if getname(encl) in line and \
-                         re.match(pat, line)]
-            if not candidate:
-                mod = getname(getmodule(fobj))
-                #HACK: get file containing 'inner' function; is func there? 
-                lines,_ = findsource(fobj)
-                candidate = [line for line in lines \
-                             if getname(fobj) in line and re.match(pat, line)]
-            if not len(candidate): raise TypeError('import could not be found')
-            candidate = candidate[-1]
-            name = candidate.split('=',1)[0].split()[-1].strip()
-            src = _getimport(mod, name, alias=alias, builtin=builtin)
-        func_vars[name] = src
-    if not func_vars:
-        name = outermost(func)
-        mod = getname(getmodule(name))
-        if not mod or name is func: # then it can be handled by getimport
-            name = getname(func, force=True) #XXX: better key?
-            src = getimport(func, alias=alias, builtin=builtin)
-        else:
-            lines,_ = findsource(name)
-            # pattern: 'func = enclosing('
-            candidate = [line for line in lines if getname(name) in line and \
-                         re.match('.*[\w\s]=\s*'+getname(name)+'\(', line)]
-            if not len(candidate): raise TypeError('import could not be found')
-            candidate = candidate[-1]
-            name = candidate.split('=',1)[0].split()[-1].strip()
-            src = _getimport(mod, name, alias=alias, builtin=builtin)
-        func_vars[name] = src
-    return func_vars
-
-#XXX: should be able to use __qualname__
-def _closuredsource(func, alias=''):
-    """get source code for closured objects; return a dict of 'name'
-    and 'code blocks'"""
-    #FIXME: this entire function is a messy messy HACK
-    #      - pollutes global namespace
-    #      - fails if name of freevars are reused
-    #      - can unnecessarily duplicate function code
-    from dill.detect import freevars
-    free_vars = freevars(func)
-    func_vars = {}
-    # split into 'funcs' and 'non-funcs'
-    for name,obj in list(free_vars.items()):
-        if not isfunction(obj):
-            # get source for 'non-funcs'
-            free_vars[name] = getsource(obj, force=True, alias=name)
-            continue
-        # get source for 'funcs'
-        fobj = free_vars.pop(name)
-        src = getsource(fobj, alias) # DO NOT include dependencies
-        # if source doesn't start with '@', use name as the alias
-        if not src.lstrip().startswith('@'): #FIXME: 'enclose' in dummy;
-            src = importable(fobj,alias=name)#        wrong ref 'name'
-            org = getsource(func, alias, enclosing=False, lstrip=True)
-            src = (src, org) # undecorated first, then target
-        else: #NOTE: reproduces the code!
-            org = getsource(func, enclosing=True, lstrip=False)
-            src = importable(fobj, alias, source=True) # include dependencies
-            src = (org, src) # target first, then decorated
-        func_vars[name] = src
-    src = ''.join(free_vars.values())
-    if not func_vars: #FIXME: 'enclose' in dummy; wrong ref 'name'
-        org = getsource(func, alias, force=True, enclosing=False, lstrip=True)
-        src = (src, org) # variables first, then target
-    else:
-        src = (src, None) # just variables        (better '' instead of None?)
-    func_vars[None] = src
-    # FIXME: remove duplicates (however, order is important...)
-    return func_vars
-
-def importable(obj, alias='', source=None, builtin=True):
-    """get an importable string (i.e. source code or the import string)
-    for the given object, including any required objects from the enclosing
-    and global scope
-
-    This function will attempt to discover the name of the object, or the repr
-    of the object, or the source code for the object. To attempt to force
-    discovery of the source code, use source=True, to attempt to force the
-    use of an import, use source=False; otherwise an import will be sought
-    for objects not defined in __main__. The intent is to build a string
-    that can be imported from a python file.
-
-    obj is the object to inspect. If alias is provided, then rename the
-    object with the given alias. If builtin=True, then force an import for
-    builtins where possible.
-    """
-    #NOTE: we always 'force', and 'lstrip' as necessary
-    #NOTE: for 'enclosing', use importable(outermost(obj))
-    if source is None:
-        source = True if isfrommain(obj) else False
-    elif builtin and isbuiltin(obj):
-        source = False
-    tried_source = tried_import = False
-    while True:
-        if not source: # we want an import
-            try:
-                if _isinstance(obj): # for instances, punt to _importable
-                    return _importable(obj, alias, source=False, builtin=builtin)
-                src = _closuredimport(obj, alias=alias, builtin=builtin)
-                if len(src) == 0:
-                    raise NotImplementedError('not implemented')
-                if len(src) > 1:
-                    raise NotImplementedError('not implemented')
-                return list(src.values())[0]
-            except:
-                if tried_source: raise
-                tried_import = True
-        # we want the source
-        try:
-            src = _closuredsource(obj, alias=alias)
-            if len(src) == 0:
-                raise NotImplementedError('not implemented')
-            # groan... an inline code stitcher
-            def _code_stitcher(block):
-                "stitch together the strings in tuple 'block'"
-                if block[0] and block[-1]: block = '\n'.join(block)
-                elif block[0]: block = block[0]
-                elif block[-1]: block = block[-1]
-                else: block = ''
-                return block
-            # get free_vars first
-            _src = _code_stitcher(src.pop(None))
-            _src = [_src] if _src else []
-            # get func_vars
-            for xxx in src.values():
-                xxx = _code_stitcher(xxx)
-                if xxx: _src.append(xxx)
-            # make a single source string
-            if not len(_src):
-                src = ''
-            elif len(_src) == 1:
-                src = _src[0]
-            else:
-                src = '\n'.join(_src)
-            # get source code of objects referred to by obj in global scope
-            from dill.detect import globalvars
-            obj = globalvars(obj) #XXX: don't worry about alias?
-            obj = list(getsource(_obj,name,force=True) for (name,_obj) in obj.items())
-            obj = '\n'.join(obj) if obj else ''
-            # combine all referred-to source (global then enclosing)
-            if not obj: return src
-            if not src: return obj
-            return obj + src
-        except:
-            if tried_import: raise
-            tried_source = True
-            source = not source
-    # should never get here
-    return
-
-
-# backward compatability
-def getimportable(obj, alias='', byname=True, explicit=False):
-    return importable(obj,alias,source=(not byname),builtin=explicit)
-   #return outdent(_importable(obj,alias,source=(not byname),builtin=explicit))
-def likely_import(obj, passive=False, explicit=False):
-    return getimport(obj, verify=(not passive), builtin=explicit)
-def _likely_import(first, last, passive=False, explicit=True):
-    return _getimport(first, last, verify=(not passive), builtin=explicit)
-_get_name = getname
-getblocks_from_history = getblocks
-
-
-
-# EOF

http://git-wip-us.apache.org/repos/asf/flink/blob/68b15593/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/temp.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/temp.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/temp.py
deleted file mode 100644
index 9dedb41..0000000
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/temp.py
+++ /dev/null
@@ -1,236 +0,0 @@
-#!/usr/bin/env python
-#
-# Author: Mike McKerns (mmckerns @caltech and @uqfoundation)
-# Copyright (c) 2008-2014 California Institute of Technology.
-# License: 3-clause BSD.  The full license text is available at:
-#  - http://trac.mystic.cacr.caltech.edu/project/pathos/browser/dill/LICENSE
-"""
-Methods for serialized objects (or source code) stored in temporary files
-and file-like objects.
-"""
-#XXX: better instead to have functions write to any given file-like object ?
-#XXX: currently, all file-like objects are created by the function...
-
-from __future__ import absolute_import
-__all__ = ['dump_source', 'dump', 'dumpIO_source', 'dumpIO',\
-           'load_source', 'load', 'loadIO_source', 'loadIO']
-
-from .dill import PY3
-
-def b(x): # deal with b'foo' versus 'foo'
-    import codecs
-    return codecs.latin_1_encode(x)[0]
-
-def load_source(file, **kwds):
-    """load an object that was stored with dill.temp.dump_source
-
-    file: filehandle
-    alias: string name of stored object
-    mode: mode to open the file, one of: {'r', 'rb'}
-
-    >>> f = lambda x: x**2
-    >>> pyfile = dill.temp.dump_source(f, alias='_f')
-    >>> _f = dill.temp.load_source(pyfile)
-    >>> _f(4)
-    16
-    """
-    alias = kwds.pop('alias', None)
-    mode = kwds.pop('mode', 'r')
-    fname = getattr(file, 'name', file) # fname=file.name or fname=file (if str)
-    source = open(fname, mode=mode, **kwds).read()
-    if not alias:
-        tag = source.strip().splitlines()[-1].split()
-        if tag[0] != '#NAME:':
-            stub = source.splitlines()[0]
-            raise IOError("unknown name for code: %s" % stub)
-        alias = tag[-1]
-    local = {}
-    exec(source, local)
-    _ = eval("%s" % alias, local)
-    return _
-
-def dump_source(object, **kwds):
-    """write object source to a NamedTemporaryFile (instead of dill.dump)
-Loads with "import" or "dill.temp.load_source".  Returns the filehandle.
-
-    >>> f = lambda x: x**2
-    >>> pyfile = dill.temp.dump_source(f, alias='_f')
-    >>> _f = dill.temp.load_source(pyfile)
-    >>> _f(4)
-    16
-
-    >>> f = lambda x: x**2
-    >>> pyfile = dill.temp.dump_source(f, dir='.')
-    >>> modulename = os.path.basename(pyfile.name).split('.py')[0]
-    >>> exec('from %s import f as _f' % modulename)
-    >>> _f(4)
-    16
-
-Optional kwds:
-    If 'alias' is specified, the object will be renamed to the given string.
-
-    If 'prefix' is specified, the file name will begin with that prefix,
-    otherwise a default prefix is used.
-    
-    If 'dir' is specified, the file will be created in that directory,
-    otherwise a default directory is used.
-    
-    If 'text' is specified and true, the file is opened in text
-    mode.  Else (the default) the file is opened in binary mode.  On
-    some operating systems, this makes no difference.
-
-NOTE: Keep the return value for as long as you want your file to exist !
-    """ #XXX: write a "load_source"?
-    from .source import importable, getname
-    import tempfile
-    kwds.pop('suffix', '') # this is *always* '.py'
-    alias = kwds.pop('alias', '') #XXX: include an alias so a name is known
-    name = str(alias) or getname(object)
-    name = "\n#NAME: %s\n" % name
-    #XXX: assumes kwds['dir'] is writable and on $PYTHONPATH
-    file = tempfile.NamedTemporaryFile(suffix='.py', **kwds)
-    file.write(b(''.join([importable(object, alias=alias),name])))
-    file.flush()
-    return file
-
-def load(file, **kwds):
-    """load an object that was stored with dill.temp.dump
-
-    file: filehandle
-    mode: mode to open the file, one of: {'r', 'rb'}
-
-    >>> dumpfile = dill.temp.dump([1, 2, 3, 4, 5])
-    >>> dill.temp.load(dumpfile)
-    [1, 2, 3, 4, 5]
-    """
-    import dill as pickle
-    mode = kwds.pop('mode', 'rb')
-    name = getattr(file, 'name', file) # name=file.name or name=file (if str)
-    return pickle.load(open(name, mode=mode, **kwds))
-
-def dump(object, **kwds):
-    """dill.dump of object to a NamedTemporaryFile.
-Loads with "dill.temp.load".  Returns the filehandle.
-
-    >>> dumpfile = dill.temp.dump([1, 2, 3, 4, 5])
-    >>> dill.temp.load(dumpfile)
-    [1, 2, 3, 4, 5]
-
-Optional kwds:
-    If 'suffix' is specified, the file name will end with that suffix,
-    otherwise there will be no suffix.
-    
-    If 'prefix' is specified, the file name will begin with that prefix,
-    otherwise a default prefix is used.
-    
-    If 'dir' is specified, the file will be created in that directory,
-    otherwise a default directory is used.
-    
-    If 'text' is specified and true, the file is opened in text
-    mode.  Else (the default) the file is opened in binary mode.  On
-    some operating systems, this makes no difference.
-
-NOTE: Keep the return value for as long as you want your file to exist !
-    """
-    import dill as pickle
-    import tempfile
-    file = tempfile.NamedTemporaryFile(**kwds)
-    pickle.dump(object, file)
-    file.flush()
-    return file
-
-def loadIO(buffer, **kwds):
-    """load an object that was stored with dill.temp.dumpIO
-
-    buffer: buffer object
-
-    >>> dumpfile = dill.temp.dumpIO([1, 2, 3, 4, 5])
-    >>> dill.temp.loadIO(dumpfile)
-    [1, 2, 3, 4, 5]
-    """
-    import dill as pickle
-    if PY3:
-        from io import BytesIO as StringIO
-    else:
-        from StringIO import StringIO
-    value = getattr(buffer, 'getvalue', buffer) # value or buffer.getvalue
-    if value != buffer: value = value() # buffer.getvalue()
-    return pickle.load(StringIO(value))
-
-def dumpIO(object, **kwds):
-    """dill.dump of object to a buffer.
-Loads with "dill.temp.loadIO".  Returns the buffer object.
-
-    >>> dumpfile = dill.temp.dumpIO([1, 2, 3, 4, 5])
-    >>> dill.temp.loadIO(dumpfile)
-    [1, 2, 3, 4, 5]
-    """
-    import dill as pickle
-    if PY3:
-        from io import BytesIO as StringIO
-    else:
-        from StringIO import StringIO
-    file = StringIO()
-    pickle.dump(object, file)
-    file.flush()
-    return file
-
-def loadIO_source(buffer, **kwds):
-    """load an object that was stored with dill.temp.dumpIO_source
-
-    buffer: buffer object
-    alias: string name of stored object
-
-    >>> f = lambda x:x**2
-    >>> pyfile = dill.temp.dumpIO_source(f, alias='_f')
-    >>> _f = dill.temp.loadIO_source(pyfile)
-    >>> _f(4)
-    16
-    """
-    alias = kwds.pop('alias', None)
-    source = getattr(buffer, 'getvalue', buffer) # source or buffer.getvalue
-    if source != buffer: source = source() # buffer.getvalue()
-    if PY3: source = source.decode() # buffer to string
-    if not alias:
-        tag = source.strip().splitlines()[-1].split()
-        if tag[0] != '#NAME:':
-            stub = source.splitlines()[0]
-            raise IOError("unknown name for code: %s" % stub)
-        alias = tag[-1]
-    local = {}
-    exec(source, local)
-    _ = eval("%s" % alias, local)
-    return _
-
-def dumpIO_source(object, **kwds):
-    """write object source to a buffer (instead of dill.dump)
-Loads by with dill.temp.loadIO_source.  Returns the buffer object.
-
-    >>> f = lambda x:x**2
-    >>> pyfile = dill.temp.dumpIO_source(f, alias='_f')
-    >>> _f = dill.temp.loadIO_source(pyfile)
-    >>> _f(4)
-    16
-
-Optional kwds:
-    If 'alias' is specified, the object will be renamed to the given string.
-    """
-    from .source import importable, getname
-    if PY3:
-        from io import BytesIO as StringIO
-    else:
-        from StringIO import StringIO
-    alias = kwds.pop('alias', '') #XXX: include an alias so a name is known
-    name = str(alias) or getname(object)
-    name = "\n#NAME: %s\n" % name
-    #XXX: assumes kwds['dir'] is writable and on $PYTHONPATH
-    file = StringIO()
-    file.write(b(''.join([importable(object, alias=alias),name])))
-    file.flush()
-    return file
-
-
-del absolute_import
-
-
-# EOF

http://git-wip-us.apache.org/repos/asf/flink/blob/68b15593/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/executor.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/executor.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/executor.py
deleted file mode 100644
index 2cfb9d3..0000000
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/executor.py
+++ /dev/null
@@ -1,54 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-import sys
-import socket
-import struct
-#argv[1] = port
-
-s = None
-try:
-    import dill
-    port = int(sys.argv[1])
-
-    s = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
-    s.connect((socket.gethostbyname("localhost"), port))
-
-    size = struct.unpack(">i", s.recv(4, socket.MSG_WAITALL))[0]
-    serialized_operator = s.recv(size, socket.MSG_WAITALL)
-
-    size = struct.unpack(">i", s.recv(4, socket.MSG_WAITALL))[0]
-    import_string = s.recv(size, socket.MSG_WAITALL).decode("utf-8")
-
-    size = struct.unpack(">i", s.recv(4, socket.MSG_WAITALL))[0]
-    input_file = s.recv(size, socket.MSG_WAITALL).decode("utf-8")
-
-    size = struct.unpack(">i", s.recv(4, socket.MSG_WAITALL))[0]
-    output_file = s.recv(size, socket.MSG_WAITALL).decode("utf-8")
-
-    exec(import_string)
-    
-    operator = dill.loads(serialized_operator)
-    operator._configure(input_file, output_file, s)
-    operator._go()
-    sys.stdout.flush()
-    sys.stderr.flush()
-except:
-    sys.stdout.flush()
-    sys.stderr.flush()
-    s.send(struct.pack(">i", -2))
-    raise
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/68b15593/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Collector.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Collector.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Collector.py
index 88ada88..bf35756 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Collector.py
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Collector.py
@@ -120,10 +120,6 @@ class TypedCollector(object):
     def __init__(self, con):
         self._connection = con
 
-    def collectBytes(self, value):
-        size = pack(">I", len(value))
-        self._connection.write(b"".join([Types.TYPE_BYTES, size, value]))
-
     def collect(self, value):
         if not isinstance(value, (list, tuple)):
             self._send_field(value)

http://git-wip-us.apache.org/repos/asf/flink/blob/68b15593/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Connection.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Connection.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Connection.py
index 7ccc995..988bf25 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Connection.py
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/connection/Connection.py
@@ -17,7 +17,6 @@
 ################################################################################
 import mmap
 import socket as SOCKET
-import tempfile
 from struct import pack, unpack
 from collections import deque
 import sys
@@ -38,8 +37,8 @@ else:
 
 
 class OneWayBusyBufferingMappedFileConnection(object):
-    def __init__(self):
-        self._output_file = open(tempfile.gettempdir() + "/flink_data/output", "rb+")
+    def __init__(self, output_path):
+        self._output_file = open(output_path, "rb+")
         self._file_output_buffer = mmap.mmap(self._output_file.fileno(), MAPPED_FILE_SIZE, mmap.MAP_SHARED, mmap.ACCESS_WRITE)
 
         self._out = deque()
@@ -61,12 +60,13 @@ class OneWayBusyBufferingMappedFileConnection(object):
 
 
 class BufferingTCPMappedFileConnection(object):
-    def __init__(self, input_file=tempfile.gettempdir() + "/flink_data/input", output_file=tempfile.gettempdir() + "/flink_data/output", socket=None):
+    def __init__(self, input_file, output_file, port):
         self._input_file = open(input_file, "rb+")
         self._output_file = open(output_file, "rb+")
         self._file_input_buffer = mmap.mmap(self._input_file.fileno(), MAPPED_FILE_SIZE, mmap.MAP_SHARED, mmap.ACCESS_READ)
         self._file_output_buffer = mmap.mmap(self._output_file.fileno(), MAPPED_FILE_SIZE, mmap.MAP_SHARED, mmap.ACCESS_WRITE)
-        self._socket = socket
+        self._socket = SOCKET.socket(family=SOCKET.AF_INET, type=SOCKET.SOCK_STREAM)
+        self._socket.connect((SOCKET.gethostbyname("localhost"), port))
 
         self._out = deque()
         self._out_size = 0
@@ -128,8 +128,8 @@ class BufferingTCPMappedFileConnection(object):
 
 
 class TwinBufferingTCPMappedFileConnection(BufferingTCPMappedFileConnection):
-    def __init__(self, input_file=tempfile.gettempdir() + "/flink/data/input", output_file=tempfile.gettempdir() + "/flink/data/output", socket=None):
-        super(TwinBufferingTCPMappedFileConnection, self).__init__(input_file, output_file, socket)
+    def __init__(self, input_file, output_file, port):
+        super(TwinBufferingTCPMappedFileConnection, self).__init__(input_file, output_file, port)
         self._input = [b"", b""]
         self._input_offset = [0, 0]
         self._input_size = [0, 0]

http://git-wip-us.apache.org/repos/asf/flink/blob/68b15593/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/Function.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/Function.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/Function.py
index dfbb5c5..5323462 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/Function.py
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/functions/Function.py
@@ -16,7 +16,6 @@
 # limitations under the License.
 ################################################################################
 from abc import ABCMeta, abstractmethod
-import dill
 import sys
 from collections import deque
 from flink.connection import Connection, Iterator, Collector
@@ -32,7 +31,6 @@ class Function(object):
         self._collector = None
         self.context = None
         self._chain_operator = None
-        self._meta = None
 
     def _configure(self, input_file, output_file, port):
         self._connection = Connection.BufferingTCPMappedFileConnection(input_file, output_file, port)
@@ -42,22 +40,15 @@ class Function(object):
 
     def _configure_chain(self, collector):
         if self._chain_operator is not None:
-            frag = self._meta.split("|")
-            if "flink/functions" in frag[0]:#lambda function
-                exec("from flink.functions." + frag[1] + " import " + frag[1])
-            else:
-                self._chain_operator = self._chain_operator.replace(b"__main__", b"plan")
-                exec("from plan import " + frag[1])
-            self._collector = dill.loads(self._chain_operator)
+            self._collector = self._chain_operator
             self._collector.context = self.context
             self._collector._configure_chain(collector)
             self._collector._open()
         else:
             self._collector = collector
 
-    def _chain(self, operator, meta):
+    def _chain(self, operator):
         self._chain_operator = operator
-        self._meta = meta
 
     @abstractmethod
     def _run(self):

http://git-wip-us.apache.org/repos/asf/flink/blob/68b15593/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/Environment.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/Environment.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/Environment.py
index 61c077f..236eda4 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/Environment.py
+++ b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/flink/plan/Environment.py
@@ -15,14 +15,13 @@
 #  See the License for the specific language governing permissions and
 # limitations under the License.
 ################################################################################
-import inspect
 from flink.connection import Connection
 from flink.connection import Collector
 from flink.plan.DataSet import DataSet
 from flink.plan.Constants import _Fields, _Identifier
 from flink.utilities import Switch
-import dill
 import copy
+import sys
 
 
 def get_environment():
@@ -34,15 +33,9 @@ def get_environment():
     return Environment()
 
 
-def _dump(function):
-    return dill.dumps(function, protocol=0, byref=True)
-
-
 class Environment(object):
     def __init__(self):
         # util
-        self._connection = Connection.OneWayBusyBufferingMappedFileConnection()
-        self._collector = Collector.TypedCollector(self._connection)
         self._counter = 0
 
         #parameters
@@ -128,8 +121,41 @@ class Environment(object):
         self._parameters.append(("mode", local))
         self._parameters.append(("debug", debug))
         self._optimize_plan()
-        self._send_plan()
-        self._connection._write_buffer()
+
+        plan_mode = sys.stdin.readline().rstrip('\n') == "plan"
+
+        if plan_mode:
+            output_path = sys.stdin.readline().rstrip('\n')
+            self._connection = Connection.OneWayBusyBufferingMappedFileConnection(output_path)
+            self._collector = Collector.TypedCollector(self._connection)
+            self._send_plan()
+            self._connection._write_buffer()
+        else:
+            import struct
+            operator = None
+            try:
+                port = int(sys.stdin.readline().rstrip('\n'))
+
+                id = int(sys.stdin.readline().rstrip('\n'))
+                input_path = sys.stdin.readline().rstrip('\n')
+                output_path = sys.stdin.readline().rstrip('\n')
+
+                operator = None
+                for set in self._sets:
+                    if set[_Fields.ID] == id:
+                        operator = set[_Fields.OPERATOR]
+                    if set[_Fields.ID] == -id:
+                        operator = set[_Fields.COMBINEOP]
+                operator._configure(input_path, output_path, port)
+                operator._go()
+                sys.stdout.flush()
+                sys.stderr.flush()
+            except:
+                sys.stdout.flush()
+                sys.stderr.flush()
+                if operator is not None:
+                    operator._connection._socket.send(struct.pack(">i", -2))
+                raise
 
     def _optimize_plan(self):
         self._find_chains()
@@ -157,8 +183,7 @@ class Environment(object):
                             if parent_type in udf and len(parent[_Fields.CHILDREN]) == 1:
                                 if parent[_Fields.OPERATOR] is not None:
                                     function = child[_Fields.COMBINEOP]
-                                    meta = str(inspect.getmodule(function)) + "|" + str(function.__class__.__name__)
-                                    parent[_Fields.OPERATOR]._chain(_dump(function), meta)
+                                    parent[_Fields.OPERATOR]._chain(function)
                                     child[_Fields.COMBINE] = False
                                     parent[_Fields.NAME] += " -> PythonCombine"
                                     for bcvar in child[_Fields.BCVARS]:
@@ -170,8 +195,7 @@ class Environment(object):
                             parent_op = parent[_Fields.OPERATOR]
                             if parent_op is not None:
                                 function = child[_Fields.OPERATOR]
-                                meta = str(inspect.getmodule(function)) + "|" + str(function.__class__.__name__)
-                                parent_op._chain(_dump(function), meta)
+                                parent_op._chain(function)
                                 parent[_Fields.NAME] += " -> " + child[_Fields.NAME]
                                 parent[_Fields.TYPES] = child[_Fields.TYPES]
                                 for grand_child in child[_Fields.CHILDREN]:
@@ -233,7 +257,6 @@ class Environment(object):
 
     def _send_operations(self):
         collect = self._collector.collect
-        collectBytes = self._collector.collectBytes
         for set in self._sets:
             identifier = set.get(_Fields.IDENTIFIER)
             collect(set[_Fields.IDENTIFIER])
@@ -251,18 +274,11 @@ class Environment(object):
                     collect(set[_Fields.OTHER][_Fields.ID])
                     collect(set[_Fields.KEY1])
                     collect(set[_Fields.KEY2])
-                    collectBytes(_dump(set[_Fields.OPERATOR]))
-                    collect(set[_Fields.META])
                     collect(set[_Fields.TYPES])
                     collect(set[_Fields.NAME])
                     break
                 if case(_Identifier.CROSS, _Identifier.CROSSH, _Identifier.CROSST):
                     collect(set[_Fields.OTHER][_Fields.ID])
-                    if set[_Fields.OPERATOR] is None:
-                        collect(set[_Fields.OPERATOR])
-                    else:
-                        collectBytes(_dump(set[_Fields.OPERATOR]))
-                    collect(set[_Fields.META])
                     collect(set[_Fields.TYPES])
                     collect(len(set[_Fields.PROJECTIONS]))
                     for p in set[_Fields.PROJECTIONS]:
@@ -271,9 +287,6 @@ class Environment(object):
                     collect(set[_Fields.NAME])
                     break
                 if case(_Identifier.REDUCE, _Identifier.GROUPREDUCE):
-                    collectBytes(_dump(set[_Fields.OPERATOR]))
-                    collectBytes(_dump(set[_Fields.COMBINEOP]))
-                    collect(set[_Fields.META])
                     collect(set[_Fields.TYPES])
                     collect(set[_Fields.COMBINE])
                     collect(set[_Fields.NAME])
@@ -282,11 +295,6 @@ class Environment(object):
                     collect(set[_Fields.KEY1])
                     collect(set[_Fields.KEY2])
                     collect(set[_Fields.OTHER][_Fields.ID])
-                    if set[_Fields.OPERATOR] is None:
-                        collect(set[_Fields.OPERATOR])
-                    else:
-                        collectBytes(_dump(set[_Fields.OPERATOR]))
-                    collect(set[_Fields.META])
                     collect(set[_Fields.TYPES])
                     collect(len(set[_Fields.PROJECTIONS]))
                     for p in set[_Fields.PROJECTIONS]:
@@ -295,8 +303,6 @@ class Environment(object):
                     collect(set[_Fields.NAME])
                     break
                 if case(_Identifier.MAP, _Identifier.MAPPARTITION, _Identifier.FLATMAP, _Identifier.FILTER):
-                    collectBytes(_dump(set[_Fields.OPERATOR]))
-                    collect(set[_Fields.META])
                     collect(set[_Fields.TYPES])
                     collect(set[_Fields.NAME])
                     break

http://git-wip-us.apache.org/repos/asf/flink/blob/68b15593/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index fd3a3ca..455153a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -761,8 +761,6 @@ under the License.
 						<exclude>flink-staging/flink-avro/src/test/java/org/apache/flink/api/io/avro/generated/*.java</exclude>
 						<exclude>flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/data_csv</exclude>
 						<exclude>flink-staging/flink-language-binding/flink-python/src/test/python/org/apache/flink/languagebinding/api/python/flink/test/data_text</exclude>
-						<!-- Python -->						
-						<exclude>flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/**</exclude>
 						<!-- Configuration Files. -->
 						<exclude>**/flink-bin/conf/slaves</exclude>
 						<exclude>**/flink-bin/conf/masters</exclude>


[3/3] flink git commit: [FLINK-1927][FLINK-2173][py] Operator distribution rework, fix file paths

Posted by mx...@apache.org.
[FLINK-1927][FLINK-2173][py] Operator distribution rework, fix file paths

Python operators are no longer serialized and shipped across the
cluster. Instead the plan file is executed on each node, followed by
usage of the respective operator object.

- removed dill library
- filepaths are always explicitly passed to python
- fix error reporting

This closes #931.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/68b15593
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/68b15593
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/68b15593

Branch: refs/heads/master
Commit: 68b1559319a870c3120263c5d10967ff6b690682
Parents: 1919ae7
Author: zentol <s....@web.de>
Authored: Tue Jul 21 21:22:19 2015 +0200
Committer: Maximilian Michels <mx...@apache.org>
Committed: Thu Jul 30 19:09:12 2015 +0200

----------------------------------------------------------------------
 .../api/java/common/streaming/Receiver.java     |    8 +-
 .../api/java/common/streaming/Sender.java       |    6 +-
 .../api/java/python/PythonPlanBinder.java       |   82 +-
 .../java/python/functions/PythonCoGroup.java    |    4 +-
 .../python/functions/PythonCombineIdentity.java |    4 +-
 .../python/functions/PythonMapPartition.java    |    4 +-
 .../java/python/streaming/PythonStreamer.java   |   66 +-
 .../languagebinding/api/python/dill/__diff.py   |  229 ----
 .../languagebinding/api/python/dill/__init__.py |   73 --
 .../languagebinding/api/python/dill/_objects.py |  530 ---------
 .../languagebinding/api/python/dill/detect.py   |  222 ----
 .../languagebinding/api/python/dill/dill.py     | 1034 ------------------
 .../languagebinding/api/python/dill/objtypes.py |   27 -
 .../languagebinding/api/python/dill/pointers.py |  122 ---
 .../languagebinding/api/python/dill/source.py   | 1010 -----------------
 .../languagebinding/api/python/dill/temp.py     |  236 ----
 .../languagebinding/api/python/executor.py      |   54 -
 .../api/python/flink/connection/Collector.py    |    4 -
 .../api/python/flink/connection/Connection.py   |   14 +-
 .../api/python/flink/functions/Function.py      |   13 +-
 .../api/python/flink/plan/Environment.py        |   70 +-
 pom.xml                                         |    2 -
 22 files changed, 110 insertions(+), 3704 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/68b15593/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Receiver.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Receiver.java b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Receiver.java
index 2741714..23720d7 100644
--- a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Receiver.java
+++ b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Receiver.java
@@ -50,11 +50,7 @@ public class Receiver implements Serializable {
 		setupMappedFile(path);
 	}
 
-	private void setupMappedFile(String path) throws FileNotFoundException, IOException {
-		String inputFilePath = function == null
-				? FLINK_TMP_DATA_DIR + "/" + "output"
-				: path;
-
+	private void setupMappedFile(String inputFilePath) throws FileNotFoundException, IOException {
 		File x = new File(FLINK_TMP_DATA_DIR);
 		x.mkdirs();
 
@@ -100,7 +96,7 @@ public class Receiver implements Serializable {
 			count++;
 		}
 		if (fileBuffer.get(0) == 0) {
-			throw new RuntimeException("External process not respoonding.");
+			throw new RuntimeException("External process not responding.");
 		}
 		fileBuffer.position(1);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/68b15593/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Sender.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Sender.java b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Sender.java
index d45b019..3e0c317 100644
--- a/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Sender.java
+++ b/flink-staging/flink-language-binding/flink-language-binding-generic/src/main/java/org/apache/flink/languagebinding/api/java/common/streaming/Sender.java
@@ -65,11 +65,7 @@ public class Sender implements Serializable {
 		setupMappedFile(path);
 	}
 
-	private void setupMappedFile(String path) throws FileNotFoundException, IOException {
-		String outputFilePath = function == null
-				? FLINK_TMP_DATA_DIR + "/" + "input"
-				: path;
-
+	private void setupMappedFile(String outputFilePath) throws FileNotFoundException, IOException {
 		File x = new File(FLINK_TMP_DATA_DIR);
 		x.mkdirs();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/68b15593/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinder.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinder.java b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinder.java
index c278f5c..7c30418 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinder.java
+++ b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/PythonPlanBinder.java
@@ -52,7 +52,6 @@ public class PythonPlanBinder extends PlanBinder<PythonOperationInfo> {
 
 	public static final String FLINK_PYTHON_DC_ID = "flink";
 	public static final String FLINK_PYTHON_PLAN_NAME = "/plan.py";
-	public static final String FLINK_PYTHON_EXECUTOR_NAME = "/executor.py";
 
 	public static final String FLINK_PYTHON2_BINARY_KEY = "python.binary.python2";
 	public static final String FLINK_PYTHON3_BINARY_KEY = "python.binary.python3";
@@ -64,6 +63,8 @@ public class PythonPlanBinder extends PlanBinder<PythonOperationInfo> {
 	protected static final String FLINK_DIR = System.getenv("FLINK_ROOT_DIR");
 	protected static String FULL_PATH;
 
+	public static StringBuilder arguments = new StringBuilder();
+
 	private Process process;
 
 	public static boolean usePython3 = false;
@@ -172,12 +173,11 @@ public class PythonPlanBinder extends PlanBinder<PythonOperationInfo> {
 	}
 
 	private void startPython(String[] args) throws IOException {
-		StringBuilder argsBuilder = new StringBuilder();
 		for (String arg : args) {
-			argsBuilder.append(" ").append(arg);
+			arguments.append(" ").append(arg);
 		}
 		receiver = new Receiver(null);
-		receiver.open(null);
+		receiver.open(FLINK_TMP_DATA_DIR + "/output");
 
 		String pythonBinaryPath = usePython3 ? FLINK_PYTHON3_BINARY_PATH : FLINK_PYTHON2_BINARY_PATH;
 
@@ -186,7 +186,7 @@ public class PythonPlanBinder extends PlanBinder<PythonOperationInfo> {
 		} catch (IOException ex) {
 			throw new RuntimeException(pythonBinaryPath + " does not point to a valid python binary.");
 		}
-		process = Runtime.getRuntime().exec(pythonBinaryPath + " -B " + FLINK_PYTHON_FILE_PATH + FLINK_PYTHON_PLAN_NAME + argsBuilder.toString());
+		process = Runtime.getRuntime().exec(pythonBinaryPath + " -B " + FLINK_PYTHON_FILE_PATH + FLINK_PYTHON_PLAN_NAME + arguments.toString());
 
 		new StreamPrinter(process.getInputStream()).start();
 		new StreamPrinter(process.getErrorStream()).start();
@@ -201,8 +201,15 @@ public class PythonPlanBinder extends PlanBinder<PythonOperationInfo> {
 			if (value != 0) {
 				throw new RuntimeException("Plan file caused an error. Check log-files for details.");
 			}
+			if (value == 0) {
+				throw new RuntimeException("Plan file exited prematurely without an error.");
+			}
 		} catch (IllegalThreadStateException ise) {//Process still running
 		}
+
+		process.getOutputStream().write("plan\n".getBytes());
+		process.getOutputStream().write((FLINK_TMP_DATA_DIR + "/output\n").getBytes());
+		process.getOutputStream().flush();
 	}
 
 	private void close() {
@@ -231,10 +238,7 @@ public class PythonPlanBinder extends PlanBinder<PythonOperationInfo> {
 
 	//=====Plan Binding=================================================================================================
 	protected class PythonOperationInfo extends OperationInfo {
-		protected byte[] operator;
-		protected String meta;
 		protected boolean combine;
-		protected byte[] combineOperator;
 		protected String name;
 
 		@Override
@@ -244,11 +248,8 @@ public class PythonPlanBinder extends PlanBinder<PythonOperationInfo> {
 			sb.append("ParentID: ").append(parentID).append("\n");
 			sb.append("OtherID: ").append(otherID).append("\n");
 			sb.append("Name: ").append(name).append("\n");
-			sb.append("Operator: ").append(operator == null ? null : "<operator>").append("\n");
-			sb.append("Meta: ").append(meta).append("\n");
 			sb.append("Types: ").append(types).append("\n");
 			sb.append("Combine: ").append(combine).append("\n");
-			sb.append("CombineOP: ").append(combineOperator == null ? null : "<combineop>").append("\n");
 			sb.append("Keys1: ").append(Arrays.toString(keys1)).append("\n");
 			sb.append("Keys2: ").append(Arrays.toString(keys2)).append("\n");
 			sb.append("Projections: ").append(Arrays.toString(projections)).append("\n");
@@ -264,8 +265,6 @@ public class PythonPlanBinder extends PlanBinder<PythonOperationInfo> {
 					otherID = (Integer) receiver.getRecord(true);
 					keys1 = tupleToIntArray((Tuple) receiver.getRecord(true));
 					keys2 = tupleToIntArray((Tuple) receiver.getRecord(true));
-					operator = (byte[]) receiver.getRecord();
-					meta = (String) receiver.getRecord();
 					tmpType = receiver.getRecord();
 					types = tmpType == null ? null : getForObject(tmpType);
 					name = (String) receiver.getRecord();
@@ -274,8 +273,6 @@ public class PythonPlanBinder extends PlanBinder<PythonOperationInfo> {
 				case CROSS_H:
 				case CROSS_T:
 					otherID = (Integer) receiver.getRecord(true);
-					operator = (byte[]) receiver.getRecord();
-					meta = (String) receiver.getRecord();
 					tmpType = receiver.getRecord();
 					types = tmpType == null ? null : getForObject(tmpType);
 					int cProjectCount = (Integer) receiver.getRecord(true);
@@ -289,9 +286,6 @@ public class PythonPlanBinder extends PlanBinder<PythonOperationInfo> {
 					break;
 				case REDUCE:
 				case GROUPREDUCE:
-					operator = (byte[]) receiver.getRecord();
-					combineOperator = (byte[]) receiver.getRecord();
-					meta = (String) receiver.getRecord();
 					tmpType = receiver.getRecord();
 					types = tmpType == null ? null : getForObject(tmpType);
 					combine = (Boolean) receiver.getRecord();
@@ -303,8 +297,6 @@ public class PythonPlanBinder extends PlanBinder<PythonOperationInfo> {
 					keys1 = tupleToIntArray((Tuple) receiver.getRecord(true));
 					keys2 = tupleToIntArray((Tuple) receiver.getRecord(true));
 					otherID = (Integer) receiver.getRecord(true);
-					operator = (byte[]) receiver.getRecord();
-					meta = (String) receiver.getRecord();
 					tmpType = receiver.getRecord();
 					types = tmpType == null ? null : getForObject(tmpType);
 					int jProjectCount = (Integer) receiver.getRecord(true);
@@ -320,8 +312,6 @@ public class PythonPlanBinder extends PlanBinder<PythonOperationInfo> {
 				case FLATMAP:
 				case MAP:
 				case FILTER:
-					operator = (byte[]) receiver.getRecord();
-					meta = (String) receiver.getRecord();
 					tmpType = receiver.getRecord();
 					types = tmpType == null ? null : getForObject(tmpType);
 					name = (String) receiver.getRecord();
@@ -344,7 +334,7 @@ public class PythonPlanBinder extends PlanBinder<PythonOperationInfo> {
 				op2,
 				new Keys.ExpressionKeys(firstKeys, op1.getType()),
 				new Keys.ExpressionKeys(secondKeys, op2.getType()),
-				new PythonCoGroup(info.setID, info.operator, info.types, info.meta),
+				new PythonCoGroup(info.setID, info.types),
 				info.types, info.name);
 	}
 
@@ -353,13 +343,13 @@ public class PythonPlanBinder extends PlanBinder<PythonOperationInfo> {
 		switch (mode) {
 			case NONE:
 				return op1.cross(op2).name("PythonCrossPreStep")
-						.mapPartition(new PythonMapPartition(info.setID, info.operator, info.types, info.meta)).name(info.name);
+						.mapPartition(new PythonMapPartition(info.setID, info.types)).name(info.name);
 			case HUGE:
 				return op1.crossWithHuge(op2).name("PythonCrossPreStep")
-						.mapPartition(new PythonMapPartition(info.setID, info.operator, info.types, info.meta)).name(info.name);
+						.mapPartition(new PythonMapPartition(info.setID, info.types)).name(info.name);
 			case TINY:
 				return op1.crossWithTiny(op2).name("PythonCrossPreStep")
-						.mapPartition(new PythonMapPartition(info.setID, info.operator, info.types, info.meta)).name(info.name);
+						.mapPartition(new PythonMapPartition(info.setID, info.types)).name(info.name);
 			default:
 				throw new IllegalArgumentException("Invalid Cross mode specified: " + mode);
 		}
@@ -367,25 +357,25 @@ public class PythonPlanBinder extends PlanBinder<PythonOperationInfo> {
 
 	@Override
 	protected DataSet applyFilterOperation(DataSet op1, PythonOperationInfo info) {
-		return op1.mapPartition(new PythonMapPartition(info.setID, info.operator, info.types, info.meta)).name(info.name);
+		return op1.mapPartition(new PythonMapPartition(info.setID, info.types)).name(info.name);
 	}
 
 	@Override
 	protected DataSet applyFlatMapOperation(DataSet op1, PythonOperationInfo info) {
-		return op1.mapPartition(new PythonMapPartition(info.setID, info.operator, info.types, info.meta)).name(info.name);
+		return op1.mapPartition(new PythonMapPartition(info.setID, info.types)).name(info.name);
 	}
 
 	@Override
 	protected DataSet applyGroupReduceOperation(DataSet op1, PythonOperationInfo info) {
 		if (info.combine) {
-			return op1.reduceGroup(new PythonCombineIdentity(info.setID, info.combineOperator, info.meta))
+			return op1.reduceGroup(new PythonCombineIdentity(info.setID * -1))
 					.setCombinable(true).name("PythonCombine")
-					.mapPartition(new PythonMapPartition(info.setID * -1, info.operator, info.types, info.meta))
+					.mapPartition(new PythonMapPartition(info.setID, info.types))
 					.name(info.name);
 		} else {
 			return op1.reduceGroup(new PythonCombineIdentity())
 					.setCombinable(false).name("PythonGroupReducePreStep")
-					.mapPartition(new PythonMapPartition(info.setID * -1, info.operator, info.types, info.meta))
+					.mapPartition(new PythonMapPartition(info.setID, info.types))
 					.name(info.name);
 		}
 	}
@@ -393,14 +383,14 @@ public class PythonPlanBinder extends PlanBinder<PythonOperationInfo> {
 	@Override
 	protected DataSet applyGroupReduceOperation(UnsortedGrouping op1, PythonOperationInfo info) {
 		if (info.combine) {
-			return op1.reduceGroup(new PythonCombineIdentity(info.setID, info.combineOperator, info.meta))
+			return op1.reduceGroup(new PythonCombineIdentity(info.setID * -1))
 					.setCombinable(true).name("PythonCombine")
-					.mapPartition(new PythonMapPartition(info.setID * -1, info.operator, info.types, info.meta))
+					.mapPartition(new PythonMapPartition(info.setID, info.types))
 					.name(info.name);
 		} else {
 			return op1.reduceGroup(new PythonCombineIdentity())
 					.setCombinable(false).name("PythonGroupReducePreStep")
-					.mapPartition(new PythonMapPartition(info.setID * -1, info.operator, info.types, info.meta))
+					.mapPartition(new PythonMapPartition(info.setID, info.types))
 					.name(info.name);
 		}
 	}
@@ -408,14 +398,14 @@ public class PythonPlanBinder extends PlanBinder<PythonOperationInfo> {
 	@Override
 	protected DataSet applyGroupReduceOperation(SortedGrouping op1, PythonOperationInfo info) {
 		if (info.combine) {
-			return op1.reduceGroup(new PythonCombineIdentity(info.setID, info.combineOperator, info.meta))
+			return op1.reduceGroup(new PythonCombineIdentity(info.setID * -1))
 					.setCombinable(true).name("PythonCombine")
-					.mapPartition(new PythonMapPartition(info.setID * -1, info.operator, info.types, info.meta))
+					.mapPartition(new PythonMapPartition(info.setID, info.types))
 					.name(info.name);
 		} else {
 			return op1.reduceGroup(new PythonCombineIdentity())
 					.setCombinable(false).name("PythonGroupReducePreStep")
-					.mapPartition(new PythonMapPartition(info.setID * -1, info.operator, info.types, info.meta))
+					.mapPartition(new PythonMapPartition(info.setID, info.types))
 					.name(info.name);
 		}
 	}
@@ -425,13 +415,13 @@ public class PythonPlanBinder extends PlanBinder<PythonOperationInfo> {
 		switch (mode) {
 			case NONE:
 				return op1.join(op2).where(firstKeys).equalTo(secondKeys).name("PythonJoinPreStep")
-						.mapPartition(new PythonMapPartition(info.setID, info.operator, info.types, info.meta)).name(info.name);
+						.mapPartition(new PythonMapPartition(info.setID, info.types)).name(info.name);
 			case HUGE:
 				return op1.joinWithHuge(op2).where(firstKeys).equalTo(secondKeys).name("PythonJoinPreStep")
-						.mapPartition(new PythonMapPartition(info.setID, info.operator, info.types, info.meta)).name(info.name);
+						.mapPartition(new PythonMapPartition(info.setID, info.types)).name(info.name);
 			case TINY:
 				return op1.joinWithTiny(op2).where(firstKeys).equalTo(secondKeys).name("PythonJoinPreStep")
-						.mapPartition(new PythonMapPartition(info.setID, info.operator, info.types, info.meta)).name(info.name);
+						.mapPartition(new PythonMapPartition(info.setID, info.types)).name(info.name);
 			default:
 				throw new IllegalArgumentException("Invalid join mode specified.");
 		}
@@ -439,33 +429,33 @@ public class PythonPlanBinder extends PlanBinder<PythonOperationInfo> {
 
 	@Override
 	protected DataSet applyMapOperation(DataSet op1, PythonOperationInfo info) {
-		return op1.mapPartition(new PythonMapPartition(info.setID, info.operator, info.types, info.meta)).name(info.name);
+		return op1.mapPartition(new PythonMapPartition(info.setID, info.types)).name(info.name);
 	}
 
 	@Override
 	protected DataSet applyMapPartitionOperation(DataSet op1, PythonOperationInfo info) {
-		return op1.mapPartition(new PythonMapPartition(info.setID, info.operator, info.types, info.meta)).name(info.name);
+		return op1.mapPartition(new PythonMapPartition(info.setID, info.types)).name(info.name);
 	}
 
 	@Override
 	protected DataSet applyReduceOperation(DataSet op1, PythonOperationInfo info) {
 		return op1.reduceGroup(new PythonCombineIdentity())
 				.setCombinable(false).name("PythonReducePreStep")
-				.mapPartition(new PythonMapPartition(info.setID * -1, info.operator, info.types, info.meta))
+				.mapPartition(new PythonMapPartition(info.setID, info.types))
 				.name(info.name);
 	}
 
 	@Override
 	protected DataSet applyReduceOperation(UnsortedGrouping op1, PythonOperationInfo info) {
 		if (info.combine) {
-			return op1.reduceGroup(new PythonCombineIdentity(info.setID, info.combineOperator, info.meta))
+			return op1.reduceGroup(new PythonCombineIdentity(info.setID * -1))
 					.setCombinable(true).name("PythonCombine")
-					.mapPartition(new PythonMapPartition(info.setID * -1, info.operator, info.types, info.meta))
+					.mapPartition(new PythonMapPartition(info.setID, info.types))
 					.name(info.name);
 		} else {
 			return op1.reduceGroup(new PythonCombineIdentity())
 					.setCombinable(false).name("PythonReducePreStep")
-					.mapPartition(new PythonMapPartition(info.setID * -1, info.operator, info.types, info.meta))
+					.mapPartition(new PythonMapPartition(info.setID, info.types))
 					.name(info.name);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/68b15593/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonCoGroup.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonCoGroup.java b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonCoGroup.java
index 01f18eb..26d554d 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonCoGroup.java
+++ b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonCoGroup.java
@@ -31,9 +31,9 @@ public class PythonCoGroup<IN1, IN2, OUT> extends RichCoGroupFunction<IN1, IN2,
 	private final PythonStreamer streamer;
 	private transient final TypeInformation<OUT> typeInformation;
 
-	public PythonCoGroup(int id, byte[] operator, TypeInformation<OUT> typeInformation, String metaInformation) {
+	public PythonCoGroup(int id, TypeInformation<OUT> typeInformation) {
 		this.typeInformation = typeInformation;
-		streamer = new PythonStreamer(this, id, operator, metaInformation);
+		streamer = new PythonStreamer(this, id);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/68b15593/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonCombineIdentity.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonCombineIdentity.java b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonCombineIdentity.java
index 3395f07..a8ff96c 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonCombineIdentity.java
+++ b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonCombineIdentity.java
@@ -31,8 +31,8 @@ public class PythonCombineIdentity<IN> extends RichGroupReduceFunction<IN, IN> {
 		streamer = null;
 	}
 
-	public PythonCombineIdentity(int id, byte[] operator, String metaInformation) {
-		streamer = new PythonStreamer(this, id, operator, metaInformation);
+	public PythonCombineIdentity(int id) {
+		streamer = new PythonStreamer(this, id);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/68b15593/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonMapPartition.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonMapPartition.java b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonMapPartition.java
index f582e3d..1f13e5c 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonMapPartition.java
+++ b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/functions/PythonMapPartition.java
@@ -31,9 +31,9 @@ public class PythonMapPartition<IN, OUT> extends RichMapPartitionFunction<IN, OU
 	private final PythonStreamer streamer;
 	private transient final TypeInformation<OUT> typeInformation;
 
-	public PythonMapPartition(int id, byte[] operator, TypeInformation<OUT> typeInformation, String metaInformation) {
+	public PythonMapPartition(int id, TypeInformation<OUT> typeInformation) {
 		this.typeInformation = typeInformation;
-		streamer = new PythonStreamer(this, id, operator, metaInformation);
+		streamer = new PythonStreamer(this, id);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/68b15593/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/streaming/PythonStreamer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/streaming/PythonStreamer.java b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/streaming/PythonStreamer.java
index 7dc2240..6d21c4c 100644
--- a/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/streaming/PythonStreamer.java
+++ b/flink-staging/flink-language-binding/flink-python/src/main/java/org/apache/flink/languagebinding/api/java/python/streaming/PythonStreamer.java
@@ -13,10 +13,10 @@
 package org.apache.flink.languagebinding.api.java.python.streaming;
 
 import java.io.IOException;
+import java.io.OutputStream;
 import java.lang.reflect.Field;
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import static org.apache.flink.languagebinding.api.java.common.PlanBinder.DEBUG;
-import static org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.FLINK_PYTHON_EXECUTOR_NAME;
 import static org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.FLINK_PYTHON_DC_ID;
 import static org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.FLINK_PYTHON_PLAN_NAME;
 import static org.apache.flink.languagebinding.api.java.common.PlanBinder.FLINK_TMP_DATA_DIR;
@@ -30,24 +30,22 @@ import static org.apache.flink.languagebinding.api.java.python.PythonPlanBinder.
  * This streamer is used by functions to send/receive data to/from an external python process.
  */
 public class PythonStreamer extends Streamer {
-	private final byte[] operator;
 	private Process process;
-	private final String metaInformation;
 	private final int id;
 	private final boolean usePython3;
 	private final boolean debug;
 	private Thread shutdownThread;
+	private final String planArguments;
 
 	private String inputFilePath;
 	private String outputFilePath;
 
-	public PythonStreamer(AbstractRichFunction function, int id, byte[] operator, String metaInformation) {
+	public PythonStreamer(AbstractRichFunction function, int id) {
 		super(function);
-		this.operator = operator;
-		this.metaInformation = metaInformation;
 		this.id = id;
 		this.usePython3 = PythonPlanBinder.usePython3;
 		this.debug = DEBUG;
+		planArguments = PythonPlanBinder.arguments.toString();
 	}
 
 	/**
@@ -67,21 +65,8 @@ public class PythonStreamer extends Streamer {
 		sender.open(inputFilePath);
 		receiver.open(outputFilePath);
 
-		ProcessBuilder pb = new ProcessBuilder();
-
 		String path = function.getRuntimeContext().getDistributedCache().getFile(FLINK_PYTHON_DC_ID).getAbsolutePath();
-		String executorPath = path + FLINK_PYTHON_EXECUTOR_NAME;
-		String[] frag = metaInformation.split("\\|");
-		StringBuilder importString = new StringBuilder();
-		if (frag[0].contains("__main__")) {
-			importString.append("from ");
-			importString.append(FLINK_PYTHON_PLAN_NAME.substring(1, FLINK_PYTHON_PLAN_NAME.length() - 3));
-			importString.append(" import ");
-			importString.append(frag[1]);
-		} else {
-			importString.append("import ");
-			importString.append(FLINK_PYTHON_PLAN_NAME.substring(1, FLINK_PYTHON_PLAN_NAME.length() - 3));
-		}
+		String planPath = path + FLINK_PYTHON_PLAN_NAME;
 
 		String pythonBinaryPath = usePython3 ? FLINK_PYTHON3_BINARY_PATH : FLINK_PYTHON2_BINARY_PATH;
 
@@ -90,14 +75,13 @@ public class PythonStreamer extends Streamer {
 		} catch (IOException ex) {
 			throw new RuntimeException(pythonBinaryPath + " does not point to a valid python binary.");
 		}
-		pb.command(pythonBinaryPath, "-O", "-B", executorPath, "" + server.getLocalPort());
 
 		if (debug) {
 			socket.setSoTimeout(0);
 			LOG.info("Waiting for Python Process : " + function.getRuntimeContext().getTaskName()
-					+ " Run python /tmp/flink" + FLINK_PYTHON_EXECUTOR_NAME + " " + server.getLocalPort());
+					+ " Run python " + planPath + planArguments);
 		} else {
-			process = pb.start();
+			process = Runtime.getRuntime().exec(pythonBinaryPath + " -O -B " + planPath + planArguments);
 			new StreamPrinter(process.getInputStream()).start();
 			new StreamPrinter(process.getErrorStream(), true, msg).start();
 		}
@@ -114,31 +98,13 @@ public class PythonStreamer extends Streamer {
 
 		Runtime.getRuntime().addShutdownHook(shutdownThread);
 
-		socket = server.accept();
-		in = socket.getInputStream();
-		out = socket.getOutputStream();
-
-		byte[] opSize = new byte[4];
-		putInt(opSize, 0, operator.length);
-		out.write(opSize, 0, 4);
-		out.write(operator, 0, operator.length);
-
-		byte[] meta = importString.toString().getBytes("utf-8");
-		putInt(opSize, 0, meta.length);
-		out.write(opSize, 0, 4);
-		out.write(meta, 0, meta.length);
-
-		byte[] input = inputFilePath.getBytes("utf-8");
-		putInt(opSize, 0, input.length);
-		out.write(opSize, 0, 4);
-		out.write(input, 0, input.length);
-
-		byte[] output = outputFilePath.getBytes("utf-8");
-		putInt(opSize, 0, output.length);
-		out.write(opSize, 0, 4);
-		out.write(output, 0, output.length);
-
-		out.flush();
+		OutputStream processOutput = process.getOutputStream();
+		processOutput.write("operator\n".getBytes());
+		processOutput.write(("" + server.getLocalPort() + "\n").getBytes());
+		processOutput.write((id + "\n").getBytes());
+		processOutput.write((inputFilePath + "\n").getBytes());
+		processOutput.write((outputFilePath + "\n").getBytes());
+		processOutput.flush();
 
 		try { // wait a bit to catch syntax errors
 			Thread.sleep(2000);
@@ -151,6 +117,10 @@ public class PythonStreamer extends Streamer {
 			} catch (IllegalThreadStateException ise) { //process still active -> start receiving data
 			}
 		}
+
+		socket = server.accept();
+		in = socket.getInputStream();
+		out = socket.getOutputStream();
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/68b15593/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__diff.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__diff.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__diff.py
deleted file mode 100644
index 79301a6..0000000
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__diff.py
+++ /dev/null
@@ -1,229 +0,0 @@
-#!/usr/bin/env python
-#
-# Author: Mike McKerns (mmckerns @caltech and @uqfoundation)
-# Copyright (c) 2008-2014 California Institute of Technology.
-# License: 3-clause BSD.  The full license text is available at:
-#  - http://trac.mystic.cacr.caltech.edu/project/pathos/browser/dill/LICENSE
-
-"""
-Module to show if an object has changed since it was memorised
-"""
-
-import os
-import sys
-import types
-try:
-    import numpy
-    HAS_NUMPY = True
-except:
-    HAS_NUMPY = False
-try:
-    import builtins
-except ImportError:
-    import __builtin__ as builtins
-
-# memo of objects indexed by id to a tuple (attributes, sequence items)
-# attributes is a dict indexed by attribute name to attribute id
-# sequence items is either a list of ids, of a dictionary of keys to ids
-memo = {}
-id_to_obj = {}
-# types that cannot have changing attributes
-builtins_types = set((str, list, dict, set, frozenset, int))
-dont_memo = set(id(i) for i in (memo, sys.modules, sys.path_importer_cache,
-             os.environ, id_to_obj))
-
-
-def get_attrs(obj):
-    """
-    Gets all the attributes of an object though its __dict__ or return None
-    """
-    if type(obj) in builtins_types \
-       or type(obj) is type and obj in builtins_types:
-        return
-    try:
-        return obj.__dict__
-    except:
-        return
-
-
-def get_seq(obj, cache={str: False, frozenset: False, list: True, set: True,
-                        dict: True, tuple: True, type: False,
-                        types.ModuleType: False, types.FunctionType: False,
-                        types.BuiltinFunctionType: False}):
-    """
-    Gets all the items in a sequence or return None
-    """
-    o_type = type(obj)
-    hsattr = hasattr
-    if o_type in cache:
-        if cache[o_type]:
-            if hsattr(obj, "copy"):
-                return obj.copy()
-            return obj
-    elif HAS_NUMPY and o_type in (numpy.ndarray, numpy.ma.core.MaskedConstant):
-        if obj.shape and obj.size:
-            return obj
-        else:
-            return []
-    elif hsattr(obj, "__contains__") and hsattr(obj, "__iter__") \
-       and hsattr(obj, "__len__") and hsattr(o_type, "__contains__") \
-       and hsattr(o_type, "__iter__") and hsattr(o_type, "__len__"):
-        cache[o_type] = True
-        if hsattr(obj, "copy"):
-            return obj.copy()
-        return obj
-    else:
-        cache[o_type] = False
-        return None
-
-
-def memorise(obj, force=False):
-    """
-    Adds an object to the memo, and recursively adds all the objects
-    attributes, and if it is a container, its items. Use force=True to update
-    an object already in the memo. Updating is not recursively done.
-    """
-    obj_id = id(obj)
-    if obj_id in memo and not force or obj_id in dont_memo:
-        return
-    id_ = id
-    g = get_attrs(obj)
-    if g is None:
-        attrs_id = None
-    else:
-        attrs_id = dict((key,id_(value)) for key, value in g.items())
-
-    s = get_seq(obj)
-    if s is None:
-        seq_id = None
-    elif hasattr(s, "items"):
-        seq_id = dict((id_(key),id_(value)) for key, value in s.items())
-    else:
-        seq_id = [id_(i) for i in s]
-
-    memo[obj_id] = attrs_id, seq_id
-    id_to_obj[obj_id] = obj
-    mem = memorise
-    if g is not None:
-        [mem(value) for key, value in g.items()]
-
-    if s is not None:
-        if hasattr(s, "items"):
-            [(mem(key), mem(item))
-             for key, item in s.items()]
-        else:
-            [mem(item) for item in s]
-
-
-def release_gone():
-    itop, mp, src = id_to_obj.pop, memo.pop, sys.getrefcount
-    [(itop(id_), mp(id_)) for id_, obj in list(id_to_obj.items())
-     if src(obj) < 4]
-
-
-def whats_changed(obj, seen=None, simple=False, first=True):
-    """
-    Check an object against the memo. Returns a list in the form
-    (attribute changes, container changed). Attribute changes is a dict of
-    attribute name to attribute value. container changed is a boolean.
-    If simple is true, just returns a boolean. None for either item means
-    that it has not been checked yet
-    """
-    # Special cases
-    if first:
-        # ignore the _ variable, which only appears in interactive sessions
-        if "_" in builtins.__dict__:
-            del builtins._
-        if seen is None:
-            seen = {}
-
-    obj_id = id(obj)
-
-    if obj_id in seen:
-        if simple:
-            return any(seen[obj_id])
-        return seen[obj_id]
-
-    # Safety checks
-    if obj_id in dont_memo:
-        seen[obj_id] = [{}, False]
-        if simple:
-            return False
-        return seen[obj_id]
-    elif obj_id not in memo:
-        if simple:
-            return True
-        else:
-            raise RuntimeError("Object not memorised " + str(obj))
-
-    seen[obj_id] = ({}, False)
-
-    chngd = whats_changed
-    id_ = id
-
-    # compare attributes
-    attrs = get_attrs(obj)
-    if attrs is None:
-        changed = {}
-    else:
-        obj_attrs = memo[obj_id][0]
-        obj_get = obj_attrs.get
-        changed = dict((key,None) for key in obj_attrs if key not in attrs)
-        for key, o in attrs.items():
-            if id_(o) != obj_get(key, None) or chngd(o, seen, True, False):
-                changed[key] = o
-
-    # compare sequence
-    items = get_seq(obj)
-    seq_diff = False
-    if items is not None:
-        obj_seq = memo[obj_id][1]
-        if len(items) != len(obj_seq):
-            seq_diff = True
-        elif hasattr(obj, "items"):  # dict type obj
-            obj_get = obj_seq.get
-            for key, item in items.items():
-                if id_(item) != obj_get(id_(key)) \
-                   or chngd(key, seen, True, False) \
-                   or chngd(item, seen, True, False):
-                    seq_diff = True
-                    break
-        else:
-            for i, j in zip(items, obj_seq):  # list type obj
-                if id_(i) != j or chngd(i, seen, True, False):
-                    seq_diff = True
-                    break
-    seen[obj_id] = changed, seq_diff
-    if simple:
-        return changed or seq_diff
-    return changed, seq_diff
-
-
-def has_changed(*args, **kwds):
-    kwds['simple'] = True  # ignore simple if passed in
-    return whats_changed(*args, **kwds)
-
-__import__ = __import__
-
-
-def _imp(*args, **kwds):
-    """
-    Replaces the default __import__, to allow a module to be memorised
-    before the user can change it
-    """
-    before = set(sys.modules.keys())
-    mod = __import__(*args, **kwds)
-    after = set(sys.modules.keys()).difference(before)
-    for m in after:
-        memorise(sys.modules[m])
-    return mod
-
-builtins.__import__ = _imp
-if hasattr(builtins, "_"):
-    del builtins._
-
-# memorise all already imported modules. This implies that this must be
-# imported first for any changes to be recorded
-for mod in sys.modules.values():
-    memorise(mod)
-release_gone()

http://git-wip-us.apache.org/repos/asf/flink/blob/68b15593/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__init__.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__init__.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__init__.py
deleted file mode 100644
index b03eda9..0000000
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/__init__.py
+++ /dev/null
@@ -1,73 +0,0 @@
-#!/usr/bin/env python
-#
-# Author: Mike McKerns (mmckerns @caltech and @uqfoundation)
-# Copyright (c) 2008-2014 California Institute of Technology.
-# License: 3-clause BSD.  The full license text is available at:
-#  - http://trac.mystic.cacr.caltech.edu/project/pathos/browser/dill/LICENSE
-
-from __future__ import absolute_import
-
-from .dill import dump, dumps, load, loads, dump_session, load_session, \
-    Pickler, Unpickler, register, copy, pickle, pickles, \
-    HIGHEST_PROTOCOL, DEFAULT_PROTOCOL, PicklingError, UnpicklingError, \
-    HANDLE_FMODE, CONTENTS_FMODE, FILE_FMODE
-from . import source, temp, detect
-
-# make sure "trace" is turned off
-detect.trace(False)
-
-try:
-    from imp import reload
-except ImportError:
-    pass
-
-# put the objects in order, if possible
-try:
-    from collections import OrderedDict as odict
-except ImportError:
-    try:
-        from ordereddict import OrderedDict as odict
-    except ImportError:
-        odict = dict
-objects = odict()
-# local import of dill._objects
-#from . import _objects
-#objects.update(_objects.succeeds)
-#del _objects
-
-# local import of dill.objtypes
-from . import objtypes as types
-
-def load_types(pickleable=True, unpickleable=True):
-    """load pickleable and/or unpickleable types to dill.types"""
-    # local import of dill.objects
-    from . import _objects
-    if pickleable:
-        objects.update(_objects.succeeds)
-    else:
-        [objects.pop(obj,None) for obj in _objects.succeeds]
-    if unpickleable:
-        objects.update(_objects.failures)
-    else:
-        [objects.pop(obj,None) for obj in _objects.failures]
-    objects.update(_objects.registered)
-    del _objects
-    # reset contents of types to 'empty'
-    [types.__dict__.pop(obj) for obj in list(types.__dict__.keys()) \
-                             if obj.find('Type') != -1]
-    # add corresponding types from objects to types
-    reload(types)
-
-def extend(use_dill=True):
-    '''add (or remove) dill types to/from pickle'''
-    from .dill import _revert_extension, _extend
-    if use_dill: _extend()
-    else: _revert_extension()
-    return
-
-extend()
-
-del absolute_import
-del odict
-
-# end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/68b15593/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/_objects.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/_objects.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/_objects.py
deleted file mode 100644
index b89bc0e..0000000
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/_objects.py
+++ /dev/null
@@ -1,530 +0,0 @@
-#!/usr/bin/env python
-#
-# Author: Mike McKerns (mmckerns @caltech and @uqfoundation)
-# Copyright (c) 2008-2014 California Institute of Technology.
-# License: 3-clause BSD.  The full license text is available at:
-#  - http://trac.mystic.cacr.caltech.edu/project/pathos/browser/dill/LICENSE
-"""
-all Python Standard Library objects (currently: CH 1-15 @ 2.7)
-and some other common objects (i.e. numpy.ndarray)
-"""
-
-__all__ = ['registered','failures','succeeds']
-
-# helper imports
-import warnings; warnings.filterwarnings("ignore", category=DeprecationWarning)
-import sys
-PY3 = (hex(sys.hexversion) >= '0x30000f0')
-if PY3:
-    import queue as Queue
-    import dbm as anydbm
-else:
-    import Queue
-    import anydbm
-    import sets # deprecated/removed
-    import mutex # removed
-try:
-    from cStringIO import StringIO # has StringI and StringO types
-except ImportError: # only has StringIO type
-    if PY3:
-        from io import BytesIO as StringIO
-    else:
-        from StringIO import StringIO
-import re
-import array
-import collections
-import codecs
-import struct
-import datetime
-import calendar
-import weakref
-import pprint
-import decimal
-import functools
-import itertools
-import operator
-import tempfile
-import shelve
-import zlib
-import gzip
-import zipfile
-import tarfile
-import xdrlib
-import csv
-import hashlib
-import hmac
-import os
-import logging
-import optparse
-import curses
-#import __hello__
-import threading
-import socket
-import contextlib
-try:
-    import bz2
-    import sqlite3
-    if PY3: import dbm.ndbm as dbm
-    else: import dbm
-    HAS_ALL = True
-except ImportError: # Ubuntu
-    HAS_ALL = False
-try:
-    import ctypes
-    HAS_CTYPES = True
-except ImportError: # MacPorts
-    HAS_CTYPES = False
-from curses import textpad, panel
-
-# helper objects
-class _class:
-    def _method(self):
-        pass
-#   @classmethod
-#   def _clsmethod(cls): #XXX: test me
-#       pass
-#   @staticmethod
-#   def _static(self): #XXX: test me
-#       pass
-class _class2:
-    def __call__(self):
-        pass
-_instance2 = _class2()
-class _newclass(object):
-    def _method(self):
-        pass
-#   @classmethod
-#   def _clsmethod(cls): #XXX: test me
-#       pass
-#   @staticmethod
-#   def _static(self): #XXX: test me
-#       pass
-def _function(x): yield x
-def _function2():
-    try: raise
-    except:
-        from sys import exc_info
-        e, er, tb = exc_info()
-        return er, tb
-if HAS_CTYPES:
-    class _Struct(ctypes.Structure):
-        pass
-    _Struct._fields_ = [("_field", ctypes.c_int),("next", ctypes.POINTER(_Struct))]
-_filedescrip, _tempfile = tempfile.mkstemp('r') # deleted in cleanup
-_tmpf = tempfile.TemporaryFile('w')
-
-# put the objects in order, if possible
-try:
-    from collections import OrderedDict as odict
-except ImportError:
-    try:
-        from ordereddict import OrderedDict as odict
-    except ImportError:
-        odict = dict
-# objects used by dill for type declaration
-registered = d = odict()
-# objects dill fails to pickle
-failures = x = odict()
-# all other type objects
-succeeds = a = odict()
-
-# types module (part of CH 8)
-a['BooleanType'] = bool(1)
-a['BuiltinFunctionType'] = len
-a['BuiltinMethodType'] = a['BuiltinFunctionType']
-a['BytesType'] = _bytes = codecs.latin_1_encode('\x00')[0] # bytes(1)
-a['ClassType'] = _class
-a['ComplexType'] = complex(1)
-a['DictType'] = _dict = {}
-a['DictionaryType'] = a['DictType']
-a['FloatType'] = float(1)
-a['FunctionType'] = _function
-a['InstanceType'] = _instance = _class()
-a['IntType'] = _int = int(1)
-a['ListType'] = _list = []
-a['NoneType'] = None
-a['ObjectType'] = object()
-a['StringType'] = _str = str(1)
-a['TupleType'] = _tuple = ()
-a['TypeType'] = type
-if PY3:
-    a['LongType'] = _int
-    a['UnicodeType'] = _str
-else:
-    a['LongType'] = long(1)
-    a['UnicodeType'] = unicode(1)
-# built-in constants (CH 4)
-a['CopyrightType'] = copyright
-# built-in types (CH 5)
-a['ClassObjectType'] = _newclass # <type 'type'>
-a['ClassInstanceType'] = _newclass() # <type 'class'>
-a['SetType'] = _set = set()
-a['FrozenSetType'] = frozenset()
-# built-in exceptions (CH 6)
-a['ExceptionType'] = _exception = _function2()[0]
-# string services (CH 7)
-a['SREPatternType'] = _srepattern = re.compile('')
-# data types (CH 8)
-a['ArrayType'] = array.array("f")
-a['DequeType'] = collections.deque([0])
-a['DefaultDictType'] = collections.defaultdict(_function, _dict)
-a['TZInfoType'] = datetime.tzinfo()
-a['DateTimeType'] = datetime.datetime.today()
-a['CalendarType'] = calendar.Calendar()
-if not PY3:
-    a['SetsType'] = sets.Set()
-    a['ImmutableSetType'] = sets.ImmutableSet()
-    a['MutexType'] = mutex.mutex()
-# numeric and mathematical types (CH 9)
-a['DecimalType'] = decimal.Decimal(1)
-a['CountType'] = itertools.count(0)
-# data compression and archiving (CH 12)
-a['TarInfoType'] = tarfile.TarInfo()
-# generic operating system services (CH 15)
-a['LoggerType'] = logging.getLogger()
-a['FormatterType'] = logging.Formatter() # pickle ok
-a['FilterType'] = logging.Filter() # pickle ok
-a['LogRecordType'] = logging.makeLogRecord(_dict) # pickle ok
-a['OptionParserType'] = _oparser = optparse.OptionParser() # pickle ok
-a['OptionGroupType'] = optparse.OptionGroup(_oparser,"foo") # pickle ok
-a['OptionType'] = optparse.Option('--foo') # pickle ok
-if HAS_CTYPES:
-    a['CCharType'] = _cchar = ctypes.c_char()
-    a['CWCharType'] = ctypes.c_wchar() # fail == 2.6
-    a['CByteType'] = ctypes.c_byte()
-    a['CUByteType'] = ctypes.c_ubyte()
-    a['CShortType'] = ctypes.c_short()
-    a['CUShortType'] = ctypes.c_ushort()
-    a['CIntType'] = ctypes.c_int()
-    a['CUIntType'] = ctypes.c_uint()
-    a['CLongType'] = ctypes.c_long()
-    a['CULongType'] = ctypes.c_ulong()
-    a['CLongLongType'] = ctypes.c_longlong()
-    a['CULongLongType'] = ctypes.c_ulonglong()
-    a['CFloatType'] = ctypes.c_float()
-    a['CDoubleType'] = ctypes.c_double()
-    a['CSizeTType'] = ctypes.c_size_t()
-    a['CLibraryLoaderType'] = ctypes.cdll
-    a['StructureType'] = _Struct
-    a['BigEndianStructureType'] = ctypes.BigEndianStructure()
-#NOTE: also LittleEndianStructureType and UnionType... abstract classes
-#NOTE: remember for ctypesobj.contents creates a new python object
-#NOTE: ctypes.c_int._objects is memberdescriptor for object's __dict__
-#NOTE: base class of all ctypes data types is non-public _CData
-
-try: # python 2.6
-    import fractions
-    import number
-    import io
-    from io import StringIO as TextIO
-    # built-in functions (CH 2)
-    a['ByteArrayType'] = bytearray([1])
-    # numeric and mathematical types (CH 9)
-    a['FractionType'] = fractions.Fraction()
-    a['NumberType'] = numbers.Number()
-    # generic operating system services (CH 15)
-    a['IOBaseType'] = io.IOBase()
-    a['RawIOBaseType'] = io.RawIOBase()
-    a['TextIOBaseType'] = io.TextIOBase()
-    a['BufferedIOBaseType'] = io.BufferedIOBase()
-    a['UnicodeIOType'] = TextIO() # the new StringIO
-    a['LoggingAdapterType'] = logging.LoggingAdapter(_logger,_dict) # pickle ok
-    if HAS_CTYPES:
-        a['CBoolType'] = ctypes.c_bool(1)
-        a['CLongDoubleType'] = ctypes.c_longdouble()
-except ImportError:
-    pass
-try: # python 2.7
-    import argparse
-    # data types (CH 8)
-    a['OrderedDictType'] = collections.OrderedDict(_dict)
-    a['CounterType'] = collections.Counter(_dict)
-    if HAS_CTYPES:
-        a['CSSizeTType'] = ctypes.c_ssize_t()
-    # generic operating system services (CH 15)
-    a['NullHandlerType'] = logging.NullHandler() # pickle ok  # new 2.7
-    a['ArgParseFileType'] = argparse.FileType() # pickle ok
-#except AttributeError:
-except ImportError:
-    pass
-
-# -- pickle fails on all below here -----------------------------------------
-# types module (part of CH 8)
-a['CodeType'] = compile('','','exec')
-a['DictProxyType'] = type.__dict__
-a['DictProxyType2'] = _newclass.__dict__
-a['EllipsisType'] = Ellipsis
-a['ClosedFileType'] = open(os.devnull, 'wb', buffering=0).close()
-a['GetSetDescriptorType'] = array.array.typecode
-a['LambdaType'] = _lambda = lambda x: lambda y: x #XXX: works when not imported!
-a['MemberDescriptorType'] = type.__dict__['__weakrefoffset__']
-a['MemberDescriptorType2'] = datetime.timedelta.days
-a['MethodType'] = _method = _class()._method #XXX: works when not imported!
-a['ModuleType'] = datetime
-a['NotImplementedType'] = NotImplemented
-a['SliceType'] = slice(1)
-a['UnboundMethodType'] = _class._method #XXX: works when not imported!
-a['TextWrapperType'] = open(os.devnull, 'r') # same as mode='w','w+','r+'
-a['BufferedRandomType'] = open(os.devnull, 'r+b') # same as mode='w+b'
-a['BufferedReaderType'] = open(os.devnull, 'rb') # (default: buffering=-1)
-a['BufferedWriterType'] = open(os.devnull, 'wb')
-try: # oddities: deprecated
-    from _pyio import open as _open
-    a['PyTextWrapperType'] = _open(os.devnull, 'r', buffering=-1)
-    a['PyBufferedRandomType'] = _open(os.devnull, 'r+b', buffering=-1)
-    a['PyBufferedReaderType'] = _open(os.devnull, 'rb', buffering=-1)
-    a['PyBufferedWriterType'] = _open(os.devnull, 'wb', buffering=-1)
-except ImportError:
-    pass
-# other (concrete) object types
-if PY3:
-    d['CellType'] = (_lambda)(0).__closure__[0]
-    a['XRangeType'] = _xrange = range(1)
-else:
-    d['CellType'] = (_lambda)(0).func_closure[0]
-    a['XRangeType'] = _xrange = xrange(1)
-d['MethodDescriptorType'] = type.__dict__['mro']
-d['WrapperDescriptorType'] = type.__repr__
-a['WrapperDescriptorType2'] = type.__dict__['__module__']
-# built-in functions (CH 2)
-if PY3: _methodwrap = (1).__lt__
-else: _methodwrap = (1).__cmp__
-d['MethodWrapperType'] = _methodwrap
-a['StaticMethodType'] = staticmethod(_method)
-a['ClassMethodType'] = classmethod(_method)
-a['PropertyType'] = property()
-d['SuperType'] = super(Exception, _exception)
-# string services (CH 7)
-if PY3: _in = _bytes
-else: _in = _str
-a['InputType'] = _cstrI = StringIO(_in)
-a['OutputType'] = _cstrO = StringIO()
-# data types (CH 8)
-a['WeakKeyDictionaryType'] = weakref.WeakKeyDictionary()
-a['WeakValueDictionaryType'] = weakref.WeakValueDictionary()
-a['ReferenceType'] = weakref.ref(_instance)
-a['DeadReferenceType'] = weakref.ref(_class())
-a['ProxyType'] = weakref.proxy(_instance)
-a['DeadProxyType'] = weakref.proxy(_class())
-a['CallableProxyType'] = weakref.proxy(_instance2)
-a['DeadCallableProxyType'] = weakref.proxy(_class2())
-a['QueueType'] = Queue.Queue()
-# numeric and mathematical types (CH 9)
-d['PartialType'] = functools.partial(int,base=2)
-if PY3:
-    a['IzipType'] = zip('0','1')
-else:
-    a['IzipType'] = itertools.izip('0','1')
-a['ChainType'] = itertools.chain('0','1')
-d['ItemGetterType'] = operator.itemgetter(0)
-d['AttrGetterType'] = operator.attrgetter('__repr__')
-# file and directory access (CH 10)
-if PY3: _fileW = _cstrO
-else: _fileW = _tmpf
-# data persistence (CH 11)
-if HAS_ALL:
-    a['ConnectionType'] = _conn = sqlite3.connect(':memory:')
-    a['CursorType'] = _conn.cursor()
-a['ShelveType'] = shelve.Shelf({})
-# data compression and archiving (CH 12)
-if HAS_ALL:
-    a['BZ2FileType'] = bz2.BZ2File(os.devnull) #FIXME: fail >= 3.3
-    a['BZ2CompressorType'] = bz2.BZ2Compressor()
-    a['BZ2DecompressorType'] = bz2.BZ2Decompressor()
-#a['ZipFileType'] = _zip = zipfile.ZipFile(os.devnull,'w') #FIXME: fail >= 3.2
-#_zip.write(_tempfile,'x') [causes annoying warning/error printed on import]
-#a['ZipInfoType'] = _zip.getinfo('x')
-a['TarFileType'] = tarfile.open(fileobj=_fileW,mode='w')
-# file formats (CH 13)
-a['DialectType'] = csv.get_dialect('excel')
-a['PackerType'] = xdrlib.Packer()
-# optional operating system services (CH 16)
-a['LockType'] = threading.Lock()
-a['RLockType'] = threading.RLock()
-# generic operating system services (CH 15) # also closed/open and r/w/etc...
-a['NamedLoggerType'] = _logger = logging.getLogger(__name__) #FIXME: fail >= 3.2 and <= 2.6
-#a['FrozenModuleType'] = __hello__ #FIXME: prints "Hello world..."
-# interprocess communication (CH 17)
-if PY3:
-    a['SocketType'] = _socket = socket.socket() #FIXME: fail >= 3.3
-    a['SocketPairType'] = socket.socketpair()[0] #FIXME: fail >= 3.3
-else:
-    a['SocketType'] = _socket = socket.socket()
-    a['SocketPairType'] = _socket._sock
-# python runtime services (CH 27)
-if PY3:
-    a['GeneratorContextManagerType'] = contextlib.contextmanager(max)([1])
-else:
-    a['GeneratorContextManagerType'] = contextlib.GeneratorContextManager(max)
-
-try: # ipython
-    __IPYTHON__ is True # is ipython
-except NameError:
-    # built-in constants (CH 4)
-    a['QuitterType'] = quit
-    d['ExitType'] = a['QuitterType']
-try: # numpy
-    from numpy import ufunc as _numpy_ufunc
-    from numpy import array as _numpy_array
-    from numpy import int32 as _numpy_int32
-    a['NumpyUfuncType'] = _numpy_ufunc
-    a['NumpyArrayType'] = _numpy_array
-    a['NumpyInt32Type'] = _numpy_int32
-except ImportError:
-    pass
-try: # python 2.6
-    # numeric and mathematical types (CH 9)
-    a['ProductType'] = itertools.product('0','1')
-    # generic operating system services (CH 15)
-    a['FileHandlerType'] = logging.FileHandler(os.devnull) #FIXME: fail >= 3.2 and <= 2.6
-    a['RotatingFileHandlerType'] = logging.handlers.RotatingFileHandler(os.devnull)
-    a['SocketHandlerType'] = logging.handlers.SocketHandler('localhost',514)
-    a['MemoryHandlerType'] = logging.handlers.MemoryHandler(1)
-except AttributeError:
-    pass
-try: # python 2.7
-    # data types (CH 8)
-    a['WeakSetType'] = weakref.WeakSet() # 2.7
-#   # generic operating system services (CH 15) [errors when dill is imported]
-#   a['ArgumentParserType'] = _parser = argparse.ArgumentParser('PROG')
-#   a['NamespaceType'] = _parser.parse_args() # pickle ok
-#   a['SubParsersActionType'] = _parser.add_subparsers()
-#   a['MutuallyExclusiveGroupType'] = _parser.add_mutually_exclusive_group()
-#   a['ArgumentGroupType'] = _parser.add_argument_group()
-except AttributeError:
-    pass
-
-# -- dill fails in some versions below here ---------------------------------
-# types module (part of CH 8)
-a['FileType'] = open(os.devnull, 'rb', buffering=0) # same 'wb','wb+','rb+'
-# FIXME: FileType fails >= 3.1
-# built-in functions (CH 2)
-a['ListIteratorType'] = iter(_list) # empty vs non-empty FIXME: fail < 3.2
-a['TupleIteratorType']= iter(_tuple) # empty vs non-empty FIXME: fail < 3.2
-a['XRangeIteratorType'] = iter(_xrange) # empty vs non-empty FIXME: fail < 3.2
-# data types (CH 8)
-a['PrettyPrinterType'] = pprint.PrettyPrinter() #FIXME: fail >= 3.2 and == 2.5
-# numeric and mathematical types (CH 9)
-a['CycleType'] = itertools.cycle('0') #FIXME: fail < 3.2
-# file and directory access (CH 10)
-a['TemporaryFileType'] = _tmpf #FIXME: fail >= 3.2 and == 2.5
-# data compression and archiving (CH 12)
-a['GzipFileType'] = gzip.GzipFile(fileobj=_fileW) #FIXME: fail > 3.2 and <= 2.6
-# generic operating system services (CH 15)
-a['StreamHandlerType'] = logging.StreamHandler() #FIXME: fail >= 3.2 and == 2.5
-try: # python 2.6
-    # numeric and mathematical types (CH 9)
-    a['PermutationsType'] = itertools.permutations('0') #FIXME: fail < 3.2
-    a['CombinationsType'] = itertools.combinations('0',1) #FIXME: fail < 3.2
-except AttributeError:
-    pass
-try: # python 2.7
-    # numeric and mathematical types (CH 9)
-    a['RepeatType'] = itertools.repeat(0) #FIXME: fail < 3.2
-    a['CompressType'] = itertools.compress('0',[1]) #FIXME: fail < 3.2
-    #XXX: ...and etc
-except AttributeError:
-    pass
-
-# -- dill fails on all below here -------------------------------------------
-# types module (part of CH 8)
-x['GeneratorType'] = _generator = _function(1) #XXX: priority
-x['FrameType'] = _generator.gi_frame #XXX: inspect.currentframe()
-x['TracebackType'] = _function2()[1] #(see: inspect.getouterframes,getframeinfo)
-# other (concrete) object types
-# (also: Capsule / CObject ?)
-# built-in functions (CH 2)
-x['SetIteratorType'] = iter(_set) #XXX: empty vs non-empty
-# built-in types (CH 5)
-if PY3:
-    x['DictionaryItemIteratorType'] = iter(type.__dict__.items())
-    x['DictionaryKeyIteratorType'] = iter(type.__dict__.keys())
-    x['DictionaryValueIteratorType'] = iter(type.__dict__.values())
-else:
-    x['DictionaryItemIteratorType'] = type.__dict__.iteritems()
-    x['DictionaryKeyIteratorType'] = type.__dict__.iterkeys()
-    x['DictionaryValueIteratorType'] = type.__dict__.itervalues()
-# string services (CH 7)
-x['StructType'] = struct.Struct('c')
-x['CallableIteratorType'] = _srepattern.finditer('')
-x['SREMatchType'] = _srepattern.match('')
-x['SREScannerType'] = _srepattern.scanner('')
-x['StreamReader'] = codecs.StreamReader(_cstrI) #XXX: ... and etc
-# python object persistence (CH 11)
-# x['DbShelveType'] = shelve.open('foo','n')#,protocol=2) #XXX: delete foo
-if HAS_ALL:
-    x['DbmType'] = dbm.open(_tempfile,'n')
-# x['DbCursorType'] = _dbcursor = anydbm.open('foo','n') #XXX: delete foo
-# x['DbType'] = _dbcursor.db
-# data compression and archiving (CH 12)
-x['ZlibCompressType'] = zlib.compressobj()
-x['ZlibDecompressType'] = zlib.decompressobj()
-# file formats (CH 13)
-x['CSVReaderType'] = csv.reader(_cstrI)
-x['CSVWriterType'] = csv.writer(_cstrO)
-x['CSVDictReaderType'] = csv.DictReader(_cstrI)
-x['CSVDictWriterType'] = csv.DictWriter(_cstrO,{})
-# cryptographic services (CH 14)
-x['HashType'] = hashlib.md5()
-x['HMACType'] = hmac.new(_in)
-# generic operating system services (CH 15)
-#x['CursesWindowType'] = _curwin = curses.initscr() #FIXME: messes up tty
-#x['CursesTextPadType'] = textpad.Textbox(_curwin)
-#x['CursesPanelType'] = panel.new_panel(_curwin)
-if HAS_CTYPES:
-    x['CCharPType'] = ctypes.c_char_p()
-    x['CWCharPType'] = ctypes.c_wchar_p()
-    x['CVoidPType'] = ctypes.c_void_p()
-    x['CDLLType'] = _cdll = ctypes.CDLL(None)
-    x['PyDLLType'] = _pydll = ctypes.pythonapi
-    x['FuncPtrType'] = _cdll._FuncPtr()
-    x['CCharArrayType'] = ctypes.create_string_buffer(1)
-    x['CWCharArrayType'] = ctypes.create_unicode_buffer(1)
-    x['CParamType'] = ctypes.byref(_cchar)
-    x['LPCCharType'] = ctypes.pointer(_cchar)
-    x['LPCCharObjType'] = _lpchar = ctypes.POINTER(ctypes.c_char)
-    x['NullPtrType'] = _lpchar()
-    x['NullPyObjectType'] = ctypes.py_object()
-    x['PyObjectType'] = ctypes.py_object(1)
-    x['FieldType'] = _field = _Struct._field
-    x['CFUNCTYPEType'] = _cfunc = ctypes.CFUNCTYPE(ctypes.c_char)
-    x['CFunctionType'] = _cfunc(str)
-try: # python 2.6
-    # numeric and mathematical types (CH 9)
-    x['MethodCallerType'] = operator.methodcaller('mro') # 2.6
-except AttributeError:
-    pass
-try: # python 2.7
-    # built-in types (CH 5)
-    x['MemoryType'] = memoryview(_in) # 2.7
-    x['MemoryType2'] = memoryview(bytearray(_in)) # 2.7
-    if PY3:
-        x['DictItemsType'] = _dict.items() # 2.7
-        x['DictKeysType'] = _dict.keys() # 2.7
-        x['DictValuesType'] = _dict.values() # 2.7
-    else:
-        x['DictItemsType'] = _dict.viewitems() # 2.7
-        x['DictKeysType'] = _dict.viewkeys() # 2.7
-        x['DictValuesType'] = _dict.viewvalues() # 2.7
-    # generic operating system services (CH 15)
-    x['RawTextHelpFormatterType'] = argparse.RawTextHelpFormatter('PROG')
-    x['RawDescriptionHelpFormatterType'] = argparse.RawDescriptionHelpFormatter('PROG')
-    x['ArgDefaultsHelpFormatterType'] = argparse.ArgumentDefaultsHelpFormatter('PROG')
-except NameError:
-    pass
-try: # python 2.7 (and not 3.1)
-    x['CmpKeyType'] = _cmpkey = functools.cmp_to_key(_methodwrap) # 2.7, >=3.2
-    x['CmpKeyObjType'] = _cmpkey('0') #2.7, >=3.2
-except AttributeError:
-    pass
-if PY3: # oddities: removed, etc
-    x['BufferType'] = x['MemoryType']
-else:
-    x['BufferType'] = buffer('')
-
-# -- cleanup ----------------------------------------------------------------
-a.update(d) # registered also succeed
-os.remove(_tempfile)
-
-
-# EOF

http://git-wip-us.apache.org/repos/asf/flink/blob/68b15593/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/detect.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/detect.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/detect.py
deleted file mode 100644
index 749a573..0000000
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/detect.py
+++ /dev/null
@@ -1,222 +0,0 @@
-#!/usr/bin/env python
-#
-# Author: Mike McKerns (mmckerns @caltech and @uqfoundation)
-# Copyright (c) 2008-2014 California Institute of Technology.
-# License: 3-clause BSD.  The full license text is available at:
-#  - http://trac.mystic.cacr.caltech.edu/project/pathos/browser/dill/LICENSE
-"""
-Methods for detecting objects leading to pickling failures.
-"""
-
-from __future__ import absolute_import
-from inspect import ismethod, isfunction, istraceback, isframe, iscode
-from .pointers import parent, reference, at, parents, children
-
-from .dill import _trace as trace
-from .dill import PY3
-
-def outermost(func): # is analogous to getsource(func,enclosing=True)
-    """get outermost enclosing object (i.e. the outer function in a closure)
-
-    NOTE: this is the object-equivalent of getsource(func, enclosing=True)
-    """
-    if PY3:
-        if ismethod(func):
-            _globals = func.__func__.__globals__ or {}
-        elif isfunction(func):
-            _globals = func.__globals__ or {}
-        else:
-            return #XXX: or raise? no matches
-        _globals = _globals.items()
-    else:
-        if ismethod(func):
-            _globals = func.im_func.func_globals or {}
-        elif isfunction(func):
-            _globals = func.func_globals or {}
-        else:
-            return #XXX: or raise? no matches
-        _globals = _globals.iteritems()
-    # get the enclosing source
-    from .source import getsourcelines
-    try: lines,lnum = getsourcelines(func, enclosing=True)
-    except: #TypeError, IOError
-        lines,lnum = [],None
-    code = ''.join(lines)
-    # get all possible names,objects that are named in the enclosing source
-    _locals = ((name,obj) for (name,obj) in _globals if name in code)
-    # now only save the objects that generate the enclosing block
-    for name,obj in _locals: #XXX: don't really need 'name'
-        try:
-            if getsourcelines(obj) == (lines,lnum): return obj
-        except: #TypeError, IOError
-            pass
-    return #XXX: or raise? no matches
-
-def nestedcode(func): #XXX: or return dict of {co_name: co} ?
-    """get the code objects for any nested functions (e.g. in a closure)"""
-    func = code(func)
-    if not iscode(func): return [] #XXX: or raise? no matches
-    nested = []
-    for co in func.co_consts:
-        if co is None: continue
-        co = code(co)
-        if co: nested.append(co)
-    return nested
-
-def code(func):
-    '''get the code object for the given function or method
-
-    NOTE: use dill.source.getsource(CODEOBJ) to get the source code
-    '''
-    if PY3:
-        im_func = '__func__'
-        func_code = '__code__'
-    else:
-        im_func = 'im_func'
-        func_code = 'func_code'
-    if ismethod(func): func = getattr(func, im_func)
-    if isfunction(func): func = getattr(func, func_code)
-    if istraceback(func): func = func.tb_frame
-    if isframe(func): func = func.f_code
-    if iscode(func): return func
-    return
-
-def nested(func): #XXX: or return dict of {__name__: obj} ?
-    """get any functions inside of func (e.g. inner functions in a closure)
-
-    NOTE: results may differ if the function has been executed or not.
-    If len(nestedcode(func)) > len(nested(func)), try calling func().
-    If possible, python builds code objects, but delays building functions
-    until func() is called.
-    """
-    if PY3:
-        att1 = '__code__'
-        att0 = '__func__'
-    else:
-        att1 = 'func_code' # functions
-        att0 = 'im_func'   # methods
-
-    import gc
-    funcs = []
-    # get the code objects, and try to track down by referrence
-    for co in nestedcode(func):
-        # look for function objects that refer to the code object
-        for obj in gc.get_referrers(co):
-            # get methods
-            _ = getattr(obj, att0, None) # ismethod
-            if getattr(_, att1, None) is co: funcs.append(obj)
-            # get functions
-            elif getattr(obj, att1, None) is co: funcs.append(obj)
-            # get frame objects
-            elif getattr(obj, 'f_code', None) is co: funcs.append(obj)
-            # get code objects
-            elif hasattr(obj, 'co_code') and obj is co: funcs.append(obj)
-#     frameobjs => func.func_code.co_varnames not in func.func_code.co_cellvars
-#     funcobjs => func.func_code.co_cellvars not in func.func_code.co_varnames
-#     frameobjs are not found, however funcobjs are...
-#     (see: test_mixins.quad ... and test_mixins.wtf)
-#     after execution, code objects get compiled, and them may be found by gc
-    return funcs
-
-
-def freevars(func):
-    """get objects defined in enclosing code that are referred to by func
-
-    returns a dict of {name:object}"""
-    if PY3:
-        im_func = '__func__'
-        func_code = '__code__'
-        func_closure = '__closure__'
-    else:
-        im_func = 'im_func'
-        func_code = 'func_code'
-        func_closure = 'func_closure'
-    if ismethod(func): func = getattr(func, im_func)
-    if isfunction(func):
-        closures = getattr(func, func_closure) or ()
-        func = getattr(func, func_code).co_freevars # get freevars
-    else:
-        return {}
-    return dict((name,c.cell_contents) for (name,c) in zip(func,closures))
-
-def globalvars(func):
-    """get objects defined in global scope that are referred to by func
-
-    return a dict of {name:object}"""
-    if PY3:
-        im_func = '__func__'
-        func_code = '__code__'
-        func_globals = '__globals__'
-    else:
-        im_func = 'im_func'
-        func_code = 'func_code'
-        func_globals = 'func_globals'
-    if ismethod(func): func = getattr(func, im_func)
-    if isfunction(func):
-        globs = getattr(func, func_globals) or {}
-        func = getattr(func, func_code).co_names # get names
-    else:
-        return {}
-    #NOTE: if name not in func_globals, then we skip it...
-    return dict((name,globs[name]) for name in func if name in globs)
-
-def varnames(func):
-    """get names of variables defined by func
-
-    returns a tuple (local vars, local vars referrenced by nested functions)"""
-    func = code(func)
-    if not iscode(func):
-        return () #XXX: better ((),())? or None?
-    return func.co_varnames, func.co_cellvars
-
-
-def baditems(obj, exact=False, safe=False): #XXX: obj=globals() ?
-    """get items in object that fail to pickle"""
-    if not hasattr(obj,'__iter__'): # is not iterable
-        return [j for j in (badobjects(obj,0,exact,safe),) if j is not None]
-    obj = obj.values() if getattr(obj,'values',None) else obj
-    _obj = [] # can't use a set, as items may be unhashable
-    [_obj.append(badobjects(i,0,exact,safe)) for i in obj if i not in _obj]
-    return [j for j in _obj if j is not None]
-
-
-def badobjects(obj, depth=0, exact=False, safe=False):
-    """get objects that fail to pickle"""
-    from dill import pickles
-    if not depth:
-        if pickles(obj,exact,safe): return None
-        return obj
-    return dict(((attr, badobjects(getattr(obj,attr),depth-1,exact,safe)) \
-           for attr in dir(obj) if not pickles(getattr(obj,attr),exact,safe)))
-
-def badtypes(obj, depth=0, exact=False, safe=False):
-    """get types for objects that fail to pickle"""
-    from dill import pickles
-    if not depth:
-        if pickles(obj,exact,safe): return None
-        return type(obj)
-    return dict(((attr, badtypes(getattr(obj,attr),depth-1,exact,safe)) \
-           for attr in dir(obj) if not pickles(getattr(obj,attr),exact,safe)))
-
-def errors(obj, depth=0, exact=False, safe=False):
-    """get errors for objects that fail to pickle"""
-    from dill import pickles, copy
-    if not depth:
-        try:
-            pik = copy(obj)
-            if exact:
-                assert pik == obj, \
-                    "Unpickling produces %s instead of %s" % (pik,obj)
-            assert type(pik) == type(obj), \
-                "Unpickling produces %s instead of %s" % (type(pik),type(obj))
-            return None
-        except Exception:
-            import sys
-            return sys.exc_info()[1]
-    return dict(((attr, errors(getattr(obj,attr),depth-1,exact,safe)) \
-           for attr in dir(obj) if not pickles(getattr(obj,attr),exact,safe)))
-
-del absolute_import
-
-
-# EOF


[2/3] flink git commit: [FLINK-1927][FLINK-2173][py] Operator distribution rework, fix file paths

Posted by mx...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/68b15593/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/dill.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/dill.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/dill.py
deleted file mode 100644
index cddb9ca..0000000
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/dill.py
+++ /dev/null
@@ -1,1034 +0,0 @@
-# -*- coding: utf-8 -*-
-#
-# Author: Mike McKerns (mmckerns @caltech and @uqfoundation)
-# Copyright (c) 2008-2014 California Institute of Technology.
-# License: 3-clause BSD.  The full license text is available at:
-#  - http://trac.mystic.cacr.caltech.edu/project/pathos/browser/dill/LICENSE
-"""
-dill: a utility for serialization of python objects
-
-Based on code written by Oren Tirosh and Armin Ronacher.
-Extended to a (near) full set of the builtin types (in types module),
-and coded to the pickle interface, by <mm...@caltech.edu>.
-Initial port to python3 by Jonathan Dobson, continued by mmckerns.
-Test against "all" python types (Std. Lib. CH 1-15 @ 2.7) by mmckerns.
-Test against CH16+ Std. Lib. ... TBD.
-"""
-__all__ = ['dump','dumps','load','loads','dump_session','load_session',
-           'Pickler','Unpickler','register','copy','pickle','pickles',
-           'HIGHEST_PROTOCOL','DEFAULT_PROTOCOL','PicklingError',
-           'UnpicklingError','HANDLE_FMODE','CONTENTS_FMODE','FILE_FMODE']
-
-import logging
-log = logging.getLogger("dill")
-log.addHandler(logging.StreamHandler())
-def _trace(boolean):
-    """print a trace through the stack when pickling; useful for debugging"""
-    if boolean: log.setLevel(logging.INFO)
-    else: log.setLevel(logging.WARN)
-    return
-
-import os
-import sys
-diff = None
-_use_diff = False
-PY3 = sys.version_info[0] == 3
-if PY3: #XXX: get types from dill.objtypes ?
-    import builtins as __builtin__
-    from pickle import _Pickler as StockPickler, _Unpickler as StockUnpickler
-    from _thread import LockType
-   #from io import IOBase
-    from types import CodeType, FunctionType, MethodType, GeneratorType, \
-        TracebackType, FrameType, ModuleType, BuiltinMethodType
-    BufferType = memoryview #XXX: unregistered
-    ClassType = type # no 'old-style' classes
-    EllipsisType = type(Ellipsis)
-   #FileType = IOBase
-    NotImplementedType = type(NotImplemented)
-    SliceType = slice
-    TypeType = type # 'new-style' classes #XXX: unregistered
-    XRangeType = range
-    DictProxyType = type(object.__dict__)
-else:
-    import __builtin__
-    from pickle import Pickler as StockPickler, Unpickler as StockUnpickler
-    from thread import LockType
-    from types import CodeType, FunctionType, ClassType, MethodType, \
-         GeneratorType, DictProxyType, XRangeType, SliceType, TracebackType, \
-         NotImplementedType, EllipsisType, FrameType, ModuleType, \
-         BufferType, BuiltinMethodType, TypeType
-from pickle import HIGHEST_PROTOCOL, PicklingError, UnpicklingError
-try:
-    from pickle import DEFAULT_PROTOCOL
-except ImportError:
-    DEFAULT_PROTOCOL = HIGHEST_PROTOCOL
-import __main__ as _main_module
-import marshal
-import gc
-# import zlib
-from weakref import ReferenceType, ProxyType, CallableProxyType
-from functools import partial
-from operator import itemgetter, attrgetter
-# new in python2.5
-if sys.hexversion >= 0x20500f0:
-    from types import MemberDescriptorType, GetSetDescriptorType
-# new in python3.3
-if sys.hexversion < 0x03030000:
-    FileNotFoundError = IOError
-try:
-    import ctypes
-    HAS_CTYPES = True
-except ImportError:
-    HAS_CTYPES = False
-try:
-    from numpy import ufunc as NumpyUfuncType
-    from numpy import ndarray as NumpyArrayType
-    def ndarrayinstance(obj):
-        try:
-            if not isinstance(obj, NumpyArrayType): return False
-        except ReferenceError: return False # handle 'R3' weakref in 3.x
-        # verify that __reduce__ has not been overridden
-        NumpyInstance = NumpyArrayType((0,),'int8')
-        if id(obj.__reduce_ex__) == id(NumpyInstance.__reduce_ex__) and \
-           id(obj.__reduce__) == id(NumpyInstance.__reduce__): return True
-        return False
-except ImportError:
-    NumpyUfuncType = None
-    NumpyArrayType = None
-    def ndarrayinstance(obj): return False
-
-# make sure to add these 'hand-built' types to _typemap
-if PY3:
-    CellType = type((lambda x: lambda y: x)(0).__closure__[0])
-else:
-    CellType = type((lambda x: lambda y: x)(0).func_closure[0])
-WrapperDescriptorType = type(type.__repr__)
-MethodDescriptorType = type(type.__dict__['mro'])
-MethodWrapperType = type([].__repr__)
-PartialType = type(partial(int,base=2))
-SuperType = type(super(Exception, TypeError()))
-ItemGetterType = type(itemgetter(0))
-AttrGetterType = type(attrgetter('__repr__'))
-FileType = type(open(os.devnull, 'rb', buffering=0))
-TextWrapperType = type(open(os.devnull, 'r', buffering=-1))
-BufferedRandomType = type(open(os.devnull, 'r+b', buffering=-1))
-BufferedReaderType = type(open(os.devnull, 'rb', buffering=-1))
-BufferedWriterType = type(open(os.devnull, 'wb', buffering=-1))
-try:
-    from _pyio import open as _open
-    PyTextWrapperType = type(_open(os.devnull, 'r', buffering=-1))
-    PyBufferedRandomType = type(_open(os.devnull, 'r+b', buffering=-1))
-    PyBufferedReaderType = type(_open(os.devnull, 'rb', buffering=-1))
-    PyBufferedWriterType = type(_open(os.devnull, 'wb', buffering=-1))
-except ImportError:
-    PyTextWrapperType = PyBufferedRandomType = PyBufferedReaderType = PyBufferedWriterType = None
-try:
-    from cStringIO import StringIO, InputType, OutputType
-except ImportError:
-    if PY3:
-        from io import BytesIO as StringIO
-    else:
-        from StringIO import StringIO
-    InputType = OutputType = None
-try:
-    __IPYTHON__ is True # is ipython
-    ExitType = None     # IPython.core.autocall.ExitAutocall
-    singletontypes = ['exit', 'quit', 'get_ipython']
-except NameError:
-    try: ExitType = type(exit) # apparently 'exit' can be removed
-    except NameError: ExitType = None
-    singletontypes = []
-
-### File modes
-# Pickles the file handle, preserving mode. The position of the unpickled
-# object is as for a new file handle.
-HANDLE_FMODE = 0
-# Pickles the file contents, creating a new file if on load the file does
-# not exist. The position = min(pickled position, EOF) and mode is chosen
-# as such that "best" preserves behavior of the original file.
-CONTENTS_FMODE = 1
-# Pickles the entire file (handle and contents), preserving mode and position.
-FILE_FMODE = 2
-
-### Shorthands (modified from python2.5/lib/pickle.py)
-def copy(obj, *args, **kwds):
-    """use pickling to 'copy' an object"""
-    return loads(dumps(obj, *args, **kwds))
-
-def dump(obj, file, protocol=None, byref=False, fmode=HANDLE_FMODE):#, strictio=False):
-    """pickle an object to a file"""
-    strictio = False #FIXME: strict=True needs cleanup
-    if protocol is None: protocol = DEFAULT_PROTOCOL
-    pik = Pickler(file, protocol)
-    pik._main_module = _main_module
-    # save settings
-    _byref = pik._byref
-    _strictio = pik._strictio
-    _fmode = pik._fmode
-    # apply kwd settings
-    pik._byref = bool(byref)
-    pik._strictio = bool(strictio)
-    pik._fmode = fmode
-    # hack to catch subclassed numpy array instances
-    if NumpyArrayType and ndarrayinstance(obj):
-        @register(type(obj))
-        def save_numpy_array(pickler, obj):
-            log.info("Nu: (%s, %s)" % (obj.shape,obj.dtype))
-            npdict = getattr(obj, '__dict__', None)
-            f, args, state = obj.__reduce__()
-            pik.save_reduce(_create_array, (f, args, state, npdict), obj=obj)
-            return
-    # end hack
-    pik.dump(obj)
-    # return to saved settings
-    pik._byref = _byref
-    pik._strictio = _strictio
-    pik._fmode = _fmode
-    return
-
-def dumps(obj, protocol=None, byref=False, fmode=HANDLE_FMODE):#, strictio=False):
-    """pickle an object to a string"""
-    file = StringIO()
-    dump(obj, file, protocol, byref, fmode)#, strictio)
-    return file.getvalue()
-
-def load(file):
-    """unpickle an object from a file"""
-    pik = Unpickler(file)
-    pik._main_module = _main_module
-    obj = pik.load()
-    if type(obj).__module__ == _main_module.__name__: # point obj class to main
-        try: obj.__class__ == getattr(pik._main_module, type(obj).__name__)
-        except AttributeError: pass # defined in a file
-   #_main_module.__dict__.update(obj.__dict__) #XXX: should update globals ?
-    return obj
-
-def loads(str):
-    """unpickle an object from a string"""
-    file = StringIO(str)
-    return load(file)
-
-# def dumpzs(obj, protocol=None):
-#     """pickle an object to a compressed string"""
-#     return zlib.compress(dumps(obj, protocol))
-
-# def loadzs(str):
-#     """unpickle an object from a compressed string"""
-#     return loads(zlib.decompress(str))
-
-### End: Shorthands ###
-
-### Pickle the Interpreter Session
-def dump_session(filename='/tmp/session.pkl', main_module=_main_module):
-    """pickle the current state of __main__ to a file"""
-    f = open(filename, 'wb')
-    try:
-        pickler = Pickler(f, 2)
-        pickler._main_module = main_module
-        _byref = pickler._byref
-        pickler._byref = False  # disable pickling by name reference
-        pickler._session = True # is best indicator of when pickling a session
-        pickler.dump(main_module)
-        pickler._session = False
-        pickler._byref = _byref
-    finally:
-        f.close()
-    return
-
-def load_session(filename='/tmp/session.pkl', main_module=_main_module):
-    """update the __main__ module with the state from the session file"""
-    f = open(filename, 'rb')
-    try:
-        unpickler = Unpickler(f)
-        unpickler._main_module = main_module
-        unpickler._session = True
-        module = unpickler.load()
-        unpickler._session = False
-        main_module.__dict__.update(module.__dict__)
-    finally:
-        f.close()
-    return
-
-### End: Pickle the Interpreter
-
-### Extend the Picklers
-class Pickler(StockPickler):
-    """python's Pickler extended to interpreter sessions"""
-    dispatch = StockPickler.dispatch.copy()
-    _main_module = None
-    _session = False
-    _byref = False
-    _strictio = False
-    _fmode = HANDLE_FMODE
-    pass
-
-    def __init__(self, *args, **kwargs):
-        StockPickler.__init__(self, *args, **kwargs)
-        self._main_module = _main_module
-        self._diff_cache = {}
-
-class Unpickler(StockUnpickler):
-    """python's Unpickler extended to interpreter sessions and more types"""
-    _main_module = None
-    _session = False
-
-    def find_class(self, module, name):
-        if (module, name) == ('__builtin__', '__main__'):
-            return self._main_module.__dict__ #XXX: above set w/save_module_dict
-        return StockUnpickler.find_class(self, module, name)
-    pass
-
-    def __init__(self, *args, **kwargs):
-        StockUnpickler.__init__(self, *args, **kwargs)
-        self._main_module = _main_module
-
-'''
-def dispatch_table():
-    """get the dispatch table of registered types"""
-    return Pickler.dispatch
-'''
-
-pickle_dispatch_copy = StockPickler.dispatch.copy()
-
-def pickle(t, func):
-    """expose dispatch table for user-created extensions"""
-    Pickler.dispatch[t] = func
-    return
-
-def register(t):
-    def proxy(func):
-        Pickler.dispatch[t] = func
-        return func
-    return proxy
-
-def _revert_extension():
-    for type, func in list(StockPickler.dispatch.items()):
-        if func.__module__ == __name__:
-            del StockPickler.dispatch[type]
-            if type in pickle_dispatch_copy:
-                StockPickler.dispatch[type] = pickle_dispatch_copy[type]
-
-def use_diff(on=True):
-    """
-    reduces size of pickles by only including object which have changed.
-    Decreases pickle size but increases CPU time needed.
-    Also helps avoid some unpicklable objects.
-    MUST be called at start of script, otherwise changes will not be recorded.
-    """
-    global _use_diff, diff
-    _use_diff = on
-    if _use_diff and diff is None:
-        try:
-            from . import diff as d
-        except:
-            import diff as d
-        diff = d
-
-def _create_typemap():
-    import types
-    if PY3:
-        d = dict(list(__builtin__.__dict__.items()) + \
-                 list(types.__dict__.items())).items()
-        builtin = 'builtins'
-    else:
-        d = types.__dict__.iteritems()
-        builtin = '__builtin__'
-    for key, value in d:
-        if getattr(value, '__module__', None) == builtin \
-        and type(value) is type:
-            yield key, value
-    return
-_reverse_typemap = dict(_create_typemap())
-_reverse_typemap.update({
-    'CellType': CellType,
-    'WrapperDescriptorType': WrapperDescriptorType,
-    'MethodDescriptorType': MethodDescriptorType,
-    'MethodWrapperType': MethodWrapperType,
-    'PartialType': PartialType,
-    'SuperType': SuperType,
-    'ItemGetterType': ItemGetterType,
-    'AttrGetterType': AttrGetterType,
-    'FileType': FileType,
-    'BufferedRandomType': BufferedRandomType,
-    'BufferedReaderType': BufferedReaderType,
-    'BufferedWriterType': BufferedWriterType,
-    'TextWrapperType': TextWrapperType,
-    'PyBufferedRandomType': PyBufferedRandomType,
-    'PyBufferedReaderType': PyBufferedReaderType,
-    'PyBufferedWriterType': PyBufferedWriterType,
-    'PyTextWrapperType': PyTextWrapperType,
-})
-if ExitType:
-    _reverse_typemap['ExitType'] = ExitType
-if InputType:
-    _reverse_typemap['InputType'] = InputType
-    _reverse_typemap['OutputType'] = OutputType
-if PY3:
-    _typemap = dict((v, k) for k, v in _reverse_typemap.items())
-else:
-    _typemap = dict((v, k) for k, v in _reverse_typemap.iteritems())
-
-def _unmarshal(string):
-    return marshal.loads(string)
-
-def _load_type(name):
-    return _reverse_typemap[name]
-
-def _create_type(typeobj, *args):
-    return typeobj(*args)
-
-def _create_function(fcode, fglobals, fname=None, fdefaults=None, \
-                                      fclosure=None, fdict=None):
-    # same as FunctionType, but enable passing __dict__ to new function,
-    # __dict__ is the storehouse for attributes added after function creation
-    if fdict is None: fdict = dict()
-    func = FunctionType(fcode, fglobals, fname, fdefaults, fclosure)
-    func.__dict__.update(fdict) #XXX: better copy? option to copy?
-    return func
-
-def _create_ftype(ftypeobj, func, args, kwds):
-    if kwds is None:
-        kwds = {}
-    if args is None:
-        args = ()
-    return ftypeobj(func, *args, **kwds)
-
-def _create_lock(locked, *args):
-    from threading import Lock
-    lock = Lock()
-    if locked:
-        if not lock.acquire(False):
-            raise UnpicklingError("Cannot acquire lock")
-    return lock
-
-# thanks to matsjoyce for adding all the different file modes
-def _create_filehandle(name, mode, position, closed, open, strictio, fmode, fdata): # buffering=0
-    # only pickles the handle, not the file contents... good? or StringIO(data)?
-    # (for file contents see: http://effbot.org/librarybook/copy-reg.htm)
-    # NOTE: handle special cases first (are there more special cases?)
-    names = {'<stdin>':sys.__stdin__, '<stdout>':sys.__stdout__,
-             '<stderr>':sys.__stderr__} #XXX: better fileno=(0,1,2) ?
-    if name in list(names.keys()):
-        f = names[name] #XXX: safer "f=sys.stdin"
-    elif name == '<tmpfile>':
-        f = os.tmpfile()
-    elif name == '<fdopen>':
-        import tempfile
-        f = tempfile.TemporaryFile(mode)
-    else:
-        # treat x mode as w mode
-        if "x" in mode and sys.hexversion < 0x03030000:
-            raise ValueError("invalid mode: '%s'" % mode)
-
-        if not os.path.exists(name):
-            if strictio:
-                raise FileNotFoundError("[Errno 2] No such file or directory: '%s'" % name)
-            elif "r" in mode and fmode != FILE_FMODE:
-                name = '<fdopen>' # or os.devnull?
-            current_size = 0 # or maintain position?
-        else:
-            current_size = os.path.getsize(name)
-
-        if position > current_size:
-            if strictio:
-                raise ValueError("invalid buffer size")
-            elif fmode == CONTENTS_FMODE:
-                position = current_size
-        # try to open the file by name
-        # NOTE: has different fileno
-        try:
-            #FIXME: missing: *buffering*, encoding, softspace
-            if fmode == FILE_FMODE:
-                f = open(name, mode if "w" in mode else "w")
-                f.write(fdata)
-                if "w" not in mode:
-                    f.close()
-                    f = open(name, mode)
-            elif name == '<fdopen>': # file did not exist
-                import tempfile
-                f = tempfile.TemporaryFile(mode)
-            elif fmode == CONTENTS_FMODE \
-               and ("w" in mode or "x" in mode):
-                # stop truncation when opening
-                flags = os.O_CREAT
-                if "+" in mode:
-                    flags |= os.O_RDWR
-                else:
-                    flags |= os.O_WRONLY
-                f = os.fdopen(os.open(name, flags), mode)
-                # set name to the correct value
-                if PY3:
-                    r = getattr(f, "buffer", f)
-                    r = getattr(r, "raw", r)
-                    r.name = name
-                else:
-                    class FILE(ctypes.Structure):
-                        _fields_ = [("refcount", ctypes.c_long),
-                                    ("type_obj", ctypes.py_object),
-                                    ("file_pointer", ctypes.c_voidp),
-                                    ("name", ctypes.py_object)]
-
-                    class PyObject(ctypes.Structure):
-                        _fields_ = [
-                            ("ob_refcnt", ctypes.c_int),
-                            ("ob_type", ctypes.py_object)
-                            ]
-                    if not HAS_CTYPES:
-                        raise ImportError("No module named 'ctypes'")
-                    ctypes.cast(id(f), ctypes.POINTER(FILE)).contents.name = name
-                    ctypes.cast(id(name), ctypes.POINTER(PyObject)).contents.ob_refcnt += 1
-                assert f.name == name
-            else:
-                f = open(name, mode)
-        except (IOError, FileNotFoundError):
-            err = sys.exc_info()[1]
-            raise UnpicklingError(err)
-    if closed:
-        f.close()
-    elif position >= 0 and fmode != HANDLE_FMODE:
-        f.seek(position)
-    return f
-
-def _create_stringi(value, position, closed):
-    f = StringIO(value)
-    if closed: f.close()
-    else: f.seek(position)
-    return f
-
-def _create_stringo(value, position, closed):
-    f = StringIO()
-    if closed: f.close()
-    else:
-       f.write(value)
-       f.seek(position)
-    return f
-
-class _itemgetter_helper(object):
-    def __init__(self):
-        self.items = []
-    def __getitem__(self, item):
-        self.items.append(item)
-        return
-
-class _attrgetter_helper(object):
-    def __init__(self, attrs, index=None):
-        self.attrs = attrs
-        self.index = index
-    def __getattribute__(self, attr):
-        attrs = object.__getattribute__(self, "attrs")
-        index = object.__getattribute__(self, "index")
-        if index is None:
-            index = len(attrs)
-            attrs.append(attr)
-        else:
-            attrs[index] = ".".join([attrs[index], attr])
-        return type(self)(attrs, index)
-
-if HAS_CTYPES:
-    ctypes.pythonapi.PyCell_New.restype = ctypes.py_object
-    ctypes.pythonapi.PyCell_New.argtypes = [ctypes.py_object]
-    # thanks to Paul Kienzle for cleaning the ctypes CellType logic
-    def _create_cell(contents):
-        return ctypes.pythonapi.PyCell_New(contents)
-
-def _create_weakref(obj, *args):
-    from weakref import ref
-    if obj is None: # it's dead
-        if PY3:
-            from collections import UserDict
-        else:
-            from UserDict import UserDict
-        return ref(UserDict(), *args)
-    return ref(obj, *args)
-
-def _create_weakproxy(obj, callable=False, *args):
-    from weakref import proxy
-    if obj is None: # it's dead
-        if callable: return proxy(lambda x:x, *args)
-        if PY3:
-            from collections import UserDict
-        else:
-            from UserDict import UserDict
-        return proxy(UserDict(), *args)
-    return proxy(obj, *args)
-
-def _eval_repr(repr_str):
-    return eval(repr_str)
-
-def _create_array(f, args, state, npdict=None):
-   #array = numpy.core.multiarray._reconstruct(*args)
-    array = f(*args)
-    array.__setstate__(state)
-    if npdict is not None: # we also have saved state in __dict__
-        array.__dict__.update(npdict)
-    return array
-
-def _getattr(objclass, name, repr_str):
-    # hack to grab the reference directly
-    try: #XXX: works only for __builtin__ ?
-        attr = repr_str.split("'")[3]
-        return eval(attr+'.__dict__["'+name+'"]')
-    except:
-        attr = getattr(objclass,name)
-        if name == '__dict__':
-            attr = attr[name]
-        return attr
-
-def _get_attr(self, name):
-    # stop recursive pickling
-    return getattr(self, name)
-
-def _dict_from_dictproxy(dictproxy):
-    _dict = dictproxy.copy() # convert dictproxy to dict
-    _dict.pop('__dict__', None)
-    _dict.pop('__weakref__', None)
-    return _dict
-
-def _import_module(import_name, safe=False):
-    try:
-        if '.' in import_name:
-            items = import_name.split('.')
-            module = '.'.join(items[:-1])
-            obj = items[-1]
-        else:
-            return __import__(import_name)
-        return getattr(__import__(module, None, None, [obj]), obj)
-    except (ImportError, AttributeError):
-        if safe:
-            return None
-        raise
-
-def _locate_function(obj, session=False):
-    if obj.__module__ == '__main__': # and session:
-        return False
-    found = _import_module(obj.__module__ + '.' + obj.__name__, safe=True)
-    return found is obj
-
-@register(CodeType)
-def save_code(pickler, obj):
-    log.info("Co: %s" % obj)
-    pickler.save_reduce(_unmarshal, (marshal.dumps(obj),), obj=obj)
-    return
-
-@register(FunctionType)
-def save_function(pickler, obj):
-    if not _locate_function(obj): #, pickler._session):
-        log.info("F1: %s" % obj)
-        if PY3:
-            pickler.save_reduce(_create_function, (obj.__code__, 
-                                obj.__globals__, obj.__name__,
-                                obj.__defaults__, obj.__closure__,
-                                obj.__dict__), obj=obj)
-        else:
-            pickler.save_reduce(_create_function, (obj.func_code,
-                                obj.func_globals, obj.func_name,
-                                obj.func_defaults, obj.func_closure,
-                                obj.__dict__), obj=obj)
-    else:
-        log.info("F2: %s" % obj)
-        StockPickler.save_global(pickler, obj) #NOTE: also takes name=...
-    return
-
-@register(dict)
-def save_module_dict(pickler, obj):
-    if is_dill(pickler) and obj == pickler._main_module.__dict__ and not pickler._session:
-        log.info("D1: <dict%s" % str(obj.__repr__).split('dict')[-1]) # obj
-        if PY3:
-            pickler.write(bytes('c__builtin__\n__main__\n', 'UTF-8'))
-        else:
-            pickler.write('c__builtin__\n__main__\n')
-    elif not is_dill(pickler) and obj == _main_module.__dict__:
-        log.info("D3: <dict%s" % str(obj.__repr__).split('dict')[-1]) # obj
-        if PY3:
-            pickler.write(bytes('c__main__\n__dict__\n', 'UTF-8'))
-        else:
-            pickler.write('c__main__\n__dict__\n')   #XXX: works in general?
-    elif '__name__' in obj and obj != _main_module.__dict__ \
-    and obj is getattr(_import_module(obj['__name__'],True), '__dict__', None):
-        log.info("D4: <dict%s" % str(obj.__repr__).split('dict')[-1]) # obj
-        if PY3:
-            pickler.write(bytes('c%s\n__dict__\n' % obj['__name__'], 'UTF-8'))
-        else:
-            pickler.write('c%s\n__dict__\n' % obj['__name__'])
-    else:
-        log.info("D2: <dict%s" % str(obj.__repr__).split('dict')[-1]) # obj
-        if is_dill(pickler) and pickler._session:
-            # we only care about session the first pass thru
-            pickler._session = False 
-        StockPickler.save_dict(pickler, obj)
-    return
-
-@register(ClassType)
-def save_classobj(pickler, obj):
-    if obj.__module__ == '__main__': #XXX: use _main_module.__name__ everywhere?
-        log.info("C1: %s" % obj)
-        pickler.save_reduce(ClassType, (obj.__name__, obj.__bases__,
-                                        obj.__dict__), obj=obj)
-                                       #XXX: or obj.__dict__.copy()), obj=obj) ?
-    else:
-        log.info("C2: %s" % obj)
-        StockPickler.save_global(pickler, obj)
-    return
-
-@register(LockType)
-def save_lock(pickler, obj):
-    log.info("Lo: %s" % obj)
-    pickler.save_reduce(_create_lock, (obj.locked(),), obj=obj)
-    return
-
-@register(ItemGetterType)
-def save_itemgetter(pickler, obj):
-    log.info("Ig: %s" % obj)
-    helper = _itemgetter_helper()
-    obj(helper)
-    pickler.save_reduce(type(obj), tuple(helper.items), obj=obj)
-    return
-
-@register(AttrGetterType)
-def save_attrgetter(pickler, obj):
-    log.info("Ag: %s" % obj)
-    attrs = []
-    helper = _attrgetter_helper(attrs)
-    obj(helper)
-    pickler.save_reduce(type(obj), tuple(attrs), obj=obj)
-    return
-
-def _save_file(pickler, obj, open_):
-    obj.flush()
-    if obj.closed:
-        position = None
-    else:
-        if obj in (sys.__stdout__, sys.__stderr__, sys.__stdin__):
-            position = -1
-        else:
-            position = obj.tell()
-    if pickler._fmode == FILE_FMODE:
-        f = open_(obj.name, "r")
-        fdata = f.read()
-        f.close()
-    else:
-        fdata = ""
-    strictio = pickler._strictio
-    fmode = pickler._fmode
-    pickler.save_reduce(_create_filehandle, (obj.name, obj.mode, position,
-                                             obj.closed, open_, strictio,
-                                             fmode, fdata), obj=obj)
-    return
-
-
-@register(FileType) #XXX: in 3.x has buffer=0, needs different _create?
-@register(BufferedRandomType)
-@register(BufferedReaderType)
-@register(BufferedWriterType)
-@register(TextWrapperType)
-def save_file(pickler, obj):
-    log.info("Fi: %s" % obj)
-    return _save_file(pickler, obj, open)
-
-if PyTextWrapperType:
-    @register(PyBufferedRandomType)
-    @register(PyBufferedReaderType)
-    @register(PyBufferedWriterType)
-    @register(PyTextWrapperType)
-    def save_file(pickler, obj):
-        log.info("Fi: %s" % obj)
-        return _save_file(pickler, obj, _open)
-
-# The following two functions are based on 'saveCStringIoInput'
-# and 'saveCStringIoOutput' from spickle
-# Copyright (c) 2011 by science+computing ag
-# License: http://www.apache.org/licenses/LICENSE-2.0
-if InputType:
-    @register(InputType)
-    def save_stringi(pickler, obj):
-        log.info("Io: %s" % obj)
-        if obj.closed:
-            value = ''; position = None
-        else:
-            value = obj.getvalue(); position = obj.tell()
-        pickler.save_reduce(_create_stringi, (value, position, \
-                                              obj.closed), obj=obj)
-        return
-
-    @register(OutputType)
-    def save_stringo(pickler, obj):
-        log.info("Io: %s" % obj)
-        if obj.closed:
-            value = ''; position = None
-        else:
-            value = obj.getvalue(); position = obj.tell()
-        pickler.save_reduce(_create_stringo, (value, position, \
-                                              obj.closed), obj=obj)
-        return
-
-@register(PartialType)
-def save_functor(pickler, obj):
-    log.info("Fu: %s" % obj)
-    pickler.save_reduce(_create_ftype, (type(obj), obj.func, obj.args,
-                                        obj.keywords), obj=obj)
-    return
-
-@register(SuperType)
-def save_functor(pickler, obj):
-    log.info("Su: %s" % obj)
-    pickler.save_reduce(super, (obj.__thisclass__, obj.__self__), obj=obj)
-    return
-
-@register(BuiltinMethodType)
-def save_builtin_method(pickler, obj):
-    if obj.__self__ is not None:
-        log.info("B1: %s" % obj)
-        pickler.save_reduce(_get_attr, (obj.__self__, obj.__name__), obj=obj)
-    else:
-        log.info("B2: %s" % obj)
-        StockPickler.save_global(pickler, obj)
-    return
-
-@register(MethodType) #FIXME: fails for 'hidden' or 'name-mangled' classes
-def save_instancemethod0(pickler, obj):# example: cStringIO.StringI
-    log.info("Me: %s" % obj) #XXX: obj.__dict__ handled elsewhere?
-    if PY3:
-        pickler.save_reduce(MethodType, (obj.__func__, obj.__self__), obj=obj)
-    else:
-        pickler.save_reduce(MethodType, (obj.im_func, obj.im_self,
-                                         obj.im_class), obj=obj)
-    return
-
-if sys.hexversion >= 0x20500f0:
-    @register(MemberDescriptorType)
-    @register(GetSetDescriptorType)
-    @register(MethodDescriptorType)
-    @register(WrapperDescriptorType)
-    def save_wrapper_descriptor(pickler, obj):
-        log.info("Wr: %s" % obj)
-        pickler.save_reduce(_getattr, (obj.__objclass__, obj.__name__,
-                                       obj.__repr__()), obj=obj)
-        return
-
-    @register(MethodWrapperType)
-    def save_instancemethod(pickler, obj):
-        log.info("Mw: %s" % obj)
-        pickler.save_reduce(getattr, (obj.__self__, obj.__name__), obj=obj)
-        return
-else:
-    @register(MethodDescriptorType)
-    @register(WrapperDescriptorType)
-    def save_wrapper_descriptor(pickler, obj):
-        log.info("Wr: %s" % obj)
-        pickler.save_reduce(_getattr, (obj.__objclass__, obj.__name__,
-                                       obj.__repr__()), obj=obj)
-        return
-
-if HAS_CTYPES:
-    @register(CellType)
-    def save_cell(pickler, obj):
-        log.info("Ce: %s" % obj)
-        pickler.save_reduce(_create_cell, (obj.cell_contents,), obj=obj)
-        return
- 
-# The following function is based on 'saveDictProxy' from spickle
-# Copyright (c) 2011 by science+computing ag
-# License: http://www.apache.org/licenses/LICENSE-2.0
-@register(DictProxyType)
-def save_dictproxy(pickler, obj):
-    log.info("Dp: %s" % obj)
-    attr = obj.get('__dict__')
-   #pickler.save_reduce(_create_dictproxy, (attr,'nested'), obj=obj)
-    if type(attr) == GetSetDescriptorType and attr.__name__ == "__dict__" \
-    and getattr(attr.__objclass__, "__dict__", None) == obj:
-        pickler.save_reduce(getattr, (attr.__objclass__, "__dict__"), obj=obj)
-        return
-    # all bad below... so throw ReferenceError or TypeError
-    from weakref import ReferenceError
-    raise ReferenceError("%s does not reference a class __dict__" % obj)
-
-@register(SliceType)
-def save_slice(pickler, obj):
-    log.info("Sl: %s" % obj)
-    pickler.save_reduce(slice, (obj.start, obj.stop, obj.step), obj=obj)
-    return
-
-@register(XRangeType)
-@register(EllipsisType)
-@register(NotImplementedType)
-def save_singleton(pickler, obj):
-    log.info("Si: %s" % obj)
-    pickler.save_reduce(_eval_repr, (obj.__repr__(),), obj=obj)
-    return
-
-# thanks to Paul Kienzle for pointing out ufuncs didn't pickle
-if NumpyArrayType:
-    @register(NumpyUfuncType)
-    def save_numpy_ufunc(pickler, obj):
-        log.info("Nu: %s" % obj)
-        StockPickler.save_global(pickler, obj)
-        return
-# NOTE: the above 'save' performs like:
-#   import copy_reg
-#   def udump(f): return f.__name__
-#   def uload(name): return getattr(numpy, name)
-#   copy_reg.pickle(NumpyUfuncType, udump, uload)
-
-def _proxy_helper(obj): # a dead proxy returns a reference to None
-    """get memory address of proxy's reference object"""
-    try: #FIXME: has to be a smarter way to identify if it's a proxy
-        address = int(repr(obj).rstrip('>').split(' at ')[-1], base=16)
-    except ValueError: # has a repr... is thus probably not a proxy
-        address = id(obj)
-    return address
-
-def _locate_object(address, module=None):
-    """get object located at the given memory address (inverse of id(obj))"""
-    special = [None, True, False] #XXX: more...?
-    for obj in special:
-        if address == id(obj): return obj
-    if module:
-        if PY3:
-            objects = iter(module.__dict__.values())
-        else:
-            objects = module.__dict__.itervalues()
-    else: objects = iter(gc.get_objects())
-    for obj in objects:
-        if address == id(obj): return obj
-    # all bad below... nothing found so throw ReferenceError or TypeError
-    from weakref import ReferenceError
-    try: address = hex(address)
-    except TypeError:
-        raise TypeError("'%s' is not a valid memory address" % str(address))
-    raise ReferenceError("Cannot reference object at '%s'" % address)
-
-@register(ReferenceType)
-def save_weakref(pickler, obj):
-    refobj = obj()
-    log.info("R1: %s" % obj)
-   #refobj = ctypes.pythonapi.PyWeakref_GetObject(obj) # dead returns "None"
-    pickler.save_reduce(_create_weakref, (refobj,), obj=obj)
-    return
-
-@register(ProxyType)
-@register(CallableProxyType)
-def save_weakproxy(pickler, obj):
-    refobj = _locate_object(_proxy_helper(obj))
-    try: log.info("R2: %s" % obj)
-    except ReferenceError: log.info("R3: %s" % sys.exc_info()[1])
-   #callable = bool(getattr(refobj, '__call__', None))
-    if type(obj) is CallableProxyType: callable = True
-    else: callable = False
-    pickler.save_reduce(_create_weakproxy, (refobj, callable), obj=obj)
-    return
-
-@register(ModuleType)
-def save_module(pickler, obj):
-    if False: #_use_diff:
-        if obj.__name__ != "dill":
-            try:
-                changed = diff.whats_changed(obj, seen=pickler._diff_cache)[0]
-            except RuntimeError:  # not memorised module, probably part of dill
-                pass
-            else:
-                log.info("M1: %s with diff" % obj)
-                log.info("Diff: %s", changed.keys())
-                pickler.save_reduce(_import_module, (obj.__name__,), obj=obj,
-                                    state=changed)
-                return
-
-        log.info("M2: %s" % obj)
-        pickler.save_reduce(_import_module, (obj.__name__,), obj=obj)
-    else:
-        # if a module file name starts with prefx, it should be a builtin
-        # module, so should be pickled as a reference
-        prefix = getattr(sys, "base_prefix", sys.prefix)
-        std_mod = getattr(obj, "__file__", prefix).startswith(prefix)
-        if obj.__name__ not in ("builtins", "dill") \
-           and not std_mod or is_dill(pickler) and obj is pickler._main_module:
-            log.info("M1: %s" % obj)
-            _main_dict = obj.__dict__.copy() #XXX: better no copy? option to copy?
-            [_main_dict.pop(item, None) for item in singletontypes
-                + ["__builtins__", "__loader__"]]
-            pickler.save_reduce(_import_module, (obj.__name__,), obj=obj,
-                                state=_main_dict)
-        else:
-            log.info("M2: %s" % obj)
-            pickler.save_reduce(_import_module, (obj.__name__,), obj=obj)
-        return
-    return
-
-@register(TypeType)
-def save_type(pickler, obj):
-    if obj in _typemap:
-        log.info("T1: %s" % obj)
-        pickler.save_reduce(_load_type, (_typemap[obj],), obj=obj)
-    elif obj.__module__ == '__main__':
-        try: # use StockPickler for special cases [namedtuple,]
-            [getattr(obj, attr) for attr in ('_fields','_asdict',
-                                             '_make','_replace')]
-            log.info("T6: %s" % obj)
-            StockPickler.save_global(pickler, obj)
-            return
-        except AttributeError: pass
-        if type(obj) == type:
-        #   try: # used when pickling the class as code (or the interpreter)
-            if is_dill(pickler) and not pickler._byref:
-                # thanks to Tom Stepleton pointing out pickler._session unneeded
-                log.info("T2: %s" % obj)
-                _dict = _dict_from_dictproxy(obj.__dict__)
-        #   except: # punt to StockPickler (pickle by class reference)
-            else:
-                log.info("T5: %s" % obj)
-                StockPickler.save_global(pickler, obj)
-                return
-        else:
-            log.info("T3: %s" % obj)
-            _dict = obj.__dict__
-       #print (_dict)
-       #print ("%s\n%s" % (type(obj), obj.__name__))
-       #print ("%s\n%s" % (obj.__bases__, obj.__dict__))
-        pickler.save_reduce(_create_type, (type(obj), obj.__name__,
-                                           obj.__bases__, _dict), obj=obj)
-    else:
-        log.info("T4: %s" % obj)
-       #print (obj.__dict__)
-       #print ("%s\n%s" % (type(obj), obj.__name__))
-       #print ("%s\n%s" % (obj.__bases__, obj.__dict__))
-        StockPickler.save_global(pickler, obj)
-    return
-
-# quick sanity checking
-def pickles(obj,exact=False,safe=False,**kwds):
-    """quick check if object pickles with dill"""
-    if safe: exceptions = (Exception,) # RuntimeError, ValueError
-    else:
-        exceptions = (TypeError, AssertionError, PicklingError, UnpicklingError)
-    try:
-        pik = copy(obj, **kwds)
-        try:
-            result = bool(pik.all() == obj.all())
-        except AttributeError:
-            result = pik == obj
-        if result: return True
-        if not exact:
-            return type(pik) == type(obj)
-        return False
-    except exceptions:
-        return False
-
-# use to protect against missing attributes
-def is_dill(pickler):
-    "check the dill-ness of your pickler"
-    return 'dill' in pickler.__module__
-   #return hasattr(pickler,'_main_module')
-
-def _extend():
-    """extend pickle with all of dill's registered types"""
-    # need to have pickle not choke on _main_module?  use is_dill(pickler)
-    for t,func in Pickler.dispatch.items():
-        try:
-            StockPickler.dispatch[t] = func
-        except: #TypeError, PicklingError, UnpicklingError
-            log.info("skip: %s" % t)
-        else: pass
-    return
-
-del diff, _use_diff, use_diff
-
-# EOF

http://git-wip-us.apache.org/repos/asf/flink/blob/68b15593/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/objtypes.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/objtypes.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/objtypes.py
deleted file mode 100644
index bf0b557..0000000
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/objtypes.py
+++ /dev/null
@@ -1,27 +0,0 @@
-#!/usr/bin/env python
-#
-# Author: Mike McKerns (mmckerns @caltech and @uqfoundation)
-# Copyright (c) 2008-2014 California Institute of Technology.
-# License: 3-clause BSD.  The full license text is available at:
-#  - http://trac.mystic.cacr.caltech.edu/project/pathos/browser/dill/LICENSE
-"""
-all Python Standard Library object types (currently: CH 1-15 @ 2.7)
-and some other common object types (i.e. numpy.ndarray)
-
-to load more objects and types, use dill.load_types()
-"""
-
-from __future__ import absolute_import
-
-# non-local import of dill.objects
-from dill import objects
-for _type in objects.keys():
-    exec("%s = type(objects['%s'])" % (_type,_type))
-    
-del objects
-try:
-    del _type
-except NameError:
-    pass
-
-del absolute_import

http://git-wip-us.apache.org/repos/asf/flink/blob/68b15593/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/pointers.py
----------------------------------------------------------------------
diff --git a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/pointers.py b/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/pointers.py
deleted file mode 100644
index 25714ea..0000000
--- a/flink-staging/flink-language-binding/flink-python/src/main/python/org/apache/flink/languagebinding/api/python/dill/pointers.py
+++ /dev/null
@@ -1,122 +0,0 @@
-#!/usr/bin/env python
-#
-# Author: Mike McKerns (mmckerns @caltech and @uqfoundation)
-# Copyright (c) 2008-2014 California Institute of Technology.
-# License: 3-clause BSD.  The full license text is available at:
-#  - http://trac.mystic.cacr.caltech.edu/project/pathos/browser/dill/LICENSE
-
-from __future__ import absolute_import
-__all__ = ['parent', 'reference', 'at', 'parents', 'children']
-
-import gc
-import sys
-
-from .dill import _proxy_helper as reference
-from .dill import _locate_object as at
-
-def parent(obj, objtype, ignore=()):
-    """
->>> listiter = iter([4,5,6,7])
->>> obj = parent(listiter, list)
->>> obj == [4,5,6,7]  # actually 'is', but don't have handle any longer
-True
-
-NOTE: objtype can be a single type (e.g. int or list) or a tuple of types.
-
-WARNING: if obj is a sequence (e.g. list), may produce unexpected results.
-Parent finds *one* parent (e.g. the last member of the sequence).
-    """
-    depth = 1 #XXX: always looking for the parent (only, right?)
-    chain = parents(obj, objtype, depth, ignore)
-    parent = chain.pop()
-    if parent is obj:
-        return None
-    return parent
-
-
-def parents(obj, objtype, depth=1, ignore=()): #XXX: objtype=object ?
-    """Find the chain of referents for obj. Chain will end with obj.
-
-    objtype: an object type or tuple of types to search for
-    depth: search depth (e.g. depth=2 is 'grandparents')
-    ignore: an object or tuple of objects to ignore in the search
-    """
-    edge_func = gc.get_referents # looking for refs, not back_refs
-    predicate = lambda x: isinstance(x, objtype) # looking for parent type
-   #if objtype is None: predicate = lambda x: True #XXX: in obj.mro() ?
-    ignore = (ignore,) if not hasattr(ignore, '__len__') else ignore
-    ignore = (id(obj) for obj in ignore)
-    chain = find_chain(obj, predicate, edge_func, depth)[::-1]
-    #XXX: should pop off obj... ?
-    return chain
-
-
-def children(obj, objtype, depth=1, ignore=()): #XXX: objtype=object ?
-    """Find the chain of referrers for obj. Chain will start with obj.
-
-    objtype: an object type or tuple of types to search for
-    depth: search depth (e.g. depth=2 is 'grandchildren')
-    ignore: an object or tuple of objects to ignore in the search
-
-    NOTE: a common thing to ignore is all globals, 'ignore=(globals(),)'
-
-    NOTE: repeated calls may yield different results, as python stores
-    the last value in the special variable '_'; thus, it is often good
-    to execute something to replace '_' (e.g. >>> 1+1).
-    """
-    edge_func = gc.get_referrers # looking for back_refs, not refs
-    predicate = lambda x: isinstance(x, objtype) # looking for child type
-   #if objtype is None: predicate = lambda x: True #XXX: in obj.mro() ?
-    ignore = (ignore,) if not hasattr(ignore, '__len__') else ignore
-    ignore = (id(obj) for obj in ignore)
-    chain = find_chain(obj, predicate, edge_func, depth, ignore)
-    #XXX: should pop off obj... ?
-    return chain
-
-
-# more generic helper function (cut-n-paste from objgraph)
-# Source at http://mg.pov.lt/objgraph/
-# Copyright (c) 2008-2010 Marius Gedminas <ma...@pov.lt>
-# Copyright (c) 2010 Stefano Rivera <st...@rivera.za.net>
-# Released under the MIT licence (see objgraph/objgrah.py)
-
-def find_chain(obj, predicate, edge_func, max_depth=20, extra_ignore=()):
-    queue = [obj]
-    depth = {id(obj): 0}
-    parent = {id(obj): None}
-    ignore = set(extra_ignore)
-    ignore.add(id(extra_ignore))
-    ignore.add(id(queue))
-    ignore.add(id(depth))
-    ignore.add(id(parent))
-    ignore.add(id(ignore))
-    ignore.add(id(sys._getframe()))  # this function
-    ignore.add(id(sys._getframe(1))) # find_chain/find_backref_chain, likely
-    gc.collect()
-    while queue:
-        target = queue.pop(0)
-        if predicate(target):
-            chain = [target]
-            while parent[id(target)] is not None:
-                target = parent[id(target)]
-                chain.append(target)
-            return chain
-        tdepth = depth[id(target)]
-        if tdepth < max_depth:
-            referrers = edge_func(target)
-            ignore.add(id(referrers))
-            for source in referrers:
-                if id(source) in ignore:
-                    continue
-                if id(source) not in depth:
-                    depth[id(source)] = tdepth + 1
-                    parent[id(source)] = target
-                    queue.append(source)
-    return [obj] # not found
-
-
-# backward compatability
-refobject = at
-
-
-# EOF