You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zentol <gi...@git.apache.org> on 2018/01/22 14:50:37 UTC

[GitHub] flink pull request #5333: Review5886

GitHub user zentol opened a pull request:

    https://github.com/apache/flink/pull/5333

    Review5886

    This PR is an extension of #3838 resolving all issues that i found during the review. The change log below is roughly grouped into categories to provide a better overview.
    
    Change log:
    
    General:
    - rebase branch to current master
    - incremented version to 1.5-SNAPSHOT
    - fixed kafka-connector dependency declaration
    	- set to provided
    	- scala version set to scala.binary.version
    	- flink version set to project.version
    - applied checkstyle
    	- disabled method/parameter name rules for API classes
    - assigned flink-python-streaming to 'libraries' travis profile
    - copy streaming-python jar to /opt
    - change the name of the final jar to flink-streaming-python (previously flink-python)
    - replace maven-jar-plugin with maven-shade-plugin
    
    API:
    - PDS#map()/flat_map() now return PythonSingleOutputStreamOperator
    - renamed PDS#print() to PDS#output()
    	- print is a keyword in python and thus not usable in native python APIs
    - added PythonSingleOutputStreamOperator#name()
    - removed env#execute methods that accepted local execution argument as they are redundant due to environment factory methods
    - narrow visibility of *DataStream constructors
    
    Moved/Renamed:
    - made SerializerMap top-level class and renamed it to AdapterMap
    - Moved UtilityFunctions#adapt to AdapterMap class
    - renamed UtilityFunctions to InterpreterUtils
    - moved PythonobjectInputStream2 to SerializationUtils
    - renamed PythonObjectInputStream2 to SerialVersionOverridingPythonObjectInputStream
    
    Jython:
    - renamed InterpreterUtils#smartFunctionDeserialization to deserializeFunction
    - added generic return type to #deserializeFunction
    - #deserializeFunction uses static initialization flag to detect whether it has to load jython instead of waiting for exception to happen
    - removed file cleanup in #initAndExecPythonScript as it is the binders' responsibility
    
    Connectors:
    - replaced usage of deprecated serialiation schema interfaces
    - P(S/D)Schema#(de)serialize now fails with RuntimeException if schema deserialization fails
    - remove kafka code
    	- not really tested, and I'd rather tackle connector support in a follow-up
    
    Functions:
    - Introduced AbstractPythonUDF class for sharing RichRunction#open()/close() implementations
    - PythonOutputSelector now throws FlinkRuntimeException when failing during initialization
    - added generic return type to Serializationutils#deserializeObject
    - added new serializers for PyBoolean/-Float/-Integer/-Long/-String
    - PyObjectSerializer not properly fails when an exceptioin occurs
    - improved error printing
    - PythonCollector now typed to Object and properly converts non-PyObjects
    - jython functions that use a collector now have Object has output type
    	- otherwise you would get ClassCastException if jython returns something that isn't a PyObject
    
    PythonStreamBinder
    - adjusted to follow PythonPlanBinder structure
    - client-like main() exception handling
    - replaced Random usage with UUID.randomUIID()
    - now loads GlobalConfiguration
    - local/distributed tmp dir now configurable
    	- introduced PythonOptions
    - no longer generate plan.py but instead import it directly via the PythonInterpreter
    
    Environment:
    - Reworked static environment factory methods from PythonStreamExecutionEnvironment into a PythonEnvironmentFactory
    - program main() method now accepts a PythonEnvironmentFactory
    - directories are now passed properly to the environment instead of using static fields
    - removed PythonEnvironmentConfig
    . #registerJythonSerializers now static
    
    Examples:
    - move examples to flink-streaming-python
    - change examples location in dist to examples/python/streaming
    - replace ParameterTool usage with argparse
    - pass arguments via run instead of constructor
    - remove 'if __name__ == '__main__':' block
    - remove exception wrapping around source/sink creation
    - add WordCount example
    
    Tests:
    - removed 'if __name__ == '__main__':' blocks from tests since the condition is never fulfilled
    - removed python TestBase class
    - removed print statements from tests
    - standardized test job names
    - cleaned up PythonStreamBinderTest / made it more consistent with PythonPlanBinderTest
    - run_all_tests improvements
    	- stop after first failure
    	- print stacktrace on failure
    	- no longer relies on dirname() to get cwd but uses the module file location instead
    - added log4j properties file
    - added end-to-end test

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/zentol/flink review5886

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5333.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5333
    
----
commit a19c960b0d888926eac21287b1363e9cc3b77ce4
Author: Zohar Mizrahi <zo...@...>
Date:   2016-11-15T12:46:36Z

    [FLINK-5886] Python API for streaming applications

commit 82e92c8b4898c224f8ea7ea9333056a971a0a3f4
Author: Zohar Mizrahi <zo...@...>
Date:   2017-05-09T06:28:39Z

    [FLINK-5886] Improve 'fibonacci' example
    
    1) Descrease the number of source samples, so the program finishes earlier
    2) Add support for 'local' and 'cluster' execution modes (determined by
       input argument):
       'cluster' (default) - pyflink-stream.sh fibonacci.py
       'local'             - pyflink-stream.sh fibonacci.py - --local

commit fae67be3b9baf1028dc3d3c99b5d949e65fac813
Author: Zohar Mizrahi <zo...@...>
Date:   2017-05-22T22:52:55Z

    [FLINK-5886] Apply fixes following a pull request review

commit b528ec17124528396d235b244511813bd7e099c2
Author: Zohar Mizrahi <zo...@...>
Date:   2017-05-23T17:36:00Z

    [FLINK-5886] Apply additional fix following code review

commit 801d60a7b04b9ef6d6899ffc2ba08a73ff123cab
Author: zentol <ch...@...>
Date:   2018-01-15T12:03:42Z

    Various refactorings
    
    Changelog:
    
    General:
    - rebase branch to current master
    - incremented version to 1.5-SNAPSHOT
    - fixed kafka-connector dependency declaration
    	- set to provided
    	- scala version set to scala.binary.version
    	- flink version set to project.version
    - applied checkstyle
    	- disabled method/parameter name rules for API classes
    - assigned flink-python-streaming to 'libraries' travis profile
    - copy streaming-python jar to /opt
    - change the name of the final jar to flink-streaming-python (previously flink-python)
    - replace maven-jar-plugin with maven-shade-plugin
    
    API:
    - PDS#map()/flat_map() now return PythonSingleOutputStreamOperator
    - renamed PDS#print() to PDS#output()
    	- print is a keyword in python and thus not usable in native python APIs
    - added PythonSingleOutputStreamOperator#name()
    - removed env#execute methods that accepted local execution argument as they are redundant due to environment factory methods
    - narrow visibility of *DataStream constructors
    
    Moved/Renamed:
    - made SerializerMap top-level class and renamed it to AdapterMap
    - Moved UtilityFunctions#adapt to AdapterMap class
    - renamed UtilityFunctions to InterpreterUtils
    - moved PythonobjectInputStream2 to SerializationUtils
    - renamed PythonObjectInputStream2 to SerialVersionOverridingPythonObjectInputStream
    
    Jython:
    - renamed InterpreterUtils#smartFunctionDeserialization to deserializeFunction
    - added generic return type to #deserializeFunction
    - #deserializeFunction uses static initialization flag to detect whether it has to load jython instead of waiting for exception to happen
    - removed file cleanup in #initAndExecPythonScript as it is the binders' responsibility
    
    Connectors:
    - replaced usage of deprecated serialiation schema interfaces
    - P(S/D)Schema#(de)serialize now fails with RuntimeException if schema deserialization fails
    
    Functions:
    - Introduced AbstractPythonUDF class for sharing RichRunction#open()/close() implementations
    - PythonOutputSelector now throws FlinkRuntimeException when failing during initialization
    - added generic return type to Serializationutils#deserializeObject
    - added new serializers for PyBoolean/-Float/-Integer/-Long/-String
    - PyObjectSerializer not properly fails when an exceptioin occurs
    - improved error printing
    
    - PythonCollector now typed to Object and properly converts non-PyObjects
    - jython functions that use a collector now have Object has output type
    	- otherwise you would get ClassCastException if jython returns something that isn't a PyObject
    
    PythonStreamBinder
    - adjusted to follow PythonPlanBinder structure
    - client-like main() exception handling
    - replaced Random usage with UUID.randomUIID()
    - now loads GlobalConfiguration
    - local/distributed tmp dir now configurable
    	- introduced PythonOptions
    - no longer generate plan.py but instead import it directly via the PythonInterpreter
    
    Environment:
    - Reworked static environment factory methods from PythonStreamExecutionEnvironment into a PythonEnvironmentFactory
    - program main() method now accepts a PythonEnvironmentFactory
    - directories are now passed properly to the environment instead of using static fields
    - removed PythonEnvironmentConfig
    . #registerJythonSerializers now static
    
    Examples:
    - move examples to flink-streaming-python
    - change examples location in dist to examples/python/streaming
    - replace ParameterTool usage with argparse
    - pass arguments via run instead of constructor
    - remove 'if __name__ == '__main__':' block
    - remove exception wrapping around source/sink creation
    - add WordCount example
    
    Tests:
    - removed 'if __name__ == '__main__':' blocks from tests since the condition is never fulfilled
    - removed python TestBase class
    - removed print statements from tests
    - standardized test job names
    - cleaned up PythonStreamBinderTest / made it more consistent with PythonPlanBinderTest
    - run_all_tests improvements
    	- stop after first failure
    	- print stacktrace on failure
    	- no longer relies on dirname() to get cwd but uses the module file location instead
    - added log4j properties file
    - added end-to-end test

commit d928e02b62bed798882bc7cb33badb5fab78f71c
Author: zentol <ch...@...>
Date:   2018-01-22T12:20:18Z

    remove kafka code

----


---

[GitHub] flink issue #5333: [FLINK-5886] Python API for streaming applications

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/5333
  
    merging.


---

[GitHub] flink issue #5333: [FLINK-5886] Python API for streaming applications

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/5333
  
    @zohar-mizrahi FYI


---

[GitHub] flink issue #5333: [FLINK-5886] Python API for streaming applications

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/5333
  
    Yup, running a modified script works properly without conflicts.


---

[GitHub] flink issue #5333: [FLINK-5886] Python API for streaming applications

Posted by zohar-mizrahi <gi...@git.apache.org>.
Github user zohar-mizrahi commented on the issue:

    https://github.com/apache/flink/pull/5333
  
    Just started to look at your changes and have one comment with respect to the plan.py - have you tried executing the same script twice, but on the second time change one line in the script (.e.g a map function)? Make sure the change takes place for the second run.
    (BTW - I'm not sure I'll be able to spend much time in the near future in reviewing the whole changes)


---

[GitHub] flink pull request #5333: [FLINK-5886] Python API for streaming applications

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5333#discussion_r163537435
  
    --- Diff: flink-dist/src/main/assemblies/bin.xml ---
    @@ -28,7 +28,7 @@ under the License.
     	<includeBaseDirectory>true</includeBaseDirectory>
     	<baseDirectory>flink-${project.version}</baseDirectory>
     
    -	<!-- Include flink-python.jar in lib/ -->
    +	<!-- Include flink-python.jar & flink-streaming-python.jar in lib/ -->
    --- End diff --
    
    this should be reverted


---

[GitHub] flink issue #5333: [FLINK-5886] Python API for streaming applications

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/5333
  
    I'll double check to make sure it works.


---

[GitHub] flink pull request #5333: [FLINK-5886] Python API for streaming applications

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5333


---

[GitHub] flink issue #5333: [FLINK-5886] Python API for streaming applications

Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:

    https://github.com/apache/flink/pull/5333
  
    Given that each task has a separate jython instance (separate by a classloader) there shouldn't be any conflicts though.


---