You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by zentol <gi...@git.apache.org> on 2018/01/22 14:50:37 UTC
[GitHub] flink pull request #5333: Review5886
GitHub user zentol opened a pull request:
https://github.com/apache/flink/pull/5333
Review5886
This PR is an extension of #3838 resolving all issues that i found during the review. The change log below is roughly grouped into categories to provide a better overview.
Change log:
General:
- rebase branch to current master
- incremented version to 1.5-SNAPSHOT
- fixed kafka-connector dependency declaration
- set to provided
- scala version set to scala.binary.version
- flink version set to project.version
- applied checkstyle
- disabled method/parameter name rules for API classes
- assigned flink-python-streaming to 'libraries' travis profile
- copy streaming-python jar to /opt
- change the name of the final jar to flink-streaming-python (previously flink-python)
- replace maven-jar-plugin with maven-shade-plugin
API:
- PDS#map()/flat_map() now return PythonSingleOutputStreamOperator
- renamed PDS#print() to PDS#output()
- print is a keyword in python and thus not usable in native python APIs
- added PythonSingleOutputStreamOperator#name()
- removed env#execute methods that accepted local execution argument as they are redundant due to environment factory methods
- narrow visibility of *DataStream constructors
Moved/Renamed:
- made SerializerMap top-level class and renamed it to AdapterMap
- Moved UtilityFunctions#adapt to AdapterMap class
- renamed UtilityFunctions to InterpreterUtils
- moved PythonobjectInputStream2 to SerializationUtils
- renamed PythonObjectInputStream2 to SerialVersionOverridingPythonObjectInputStream
Jython:
- renamed InterpreterUtils#smartFunctionDeserialization to deserializeFunction
- added generic return type to #deserializeFunction
- #deserializeFunction uses static initialization flag to detect whether it has to load jython instead of waiting for exception to happen
- removed file cleanup in #initAndExecPythonScript as it is the binders' responsibility
Connectors:
- replaced usage of deprecated serialiation schema interfaces
- P(S/D)Schema#(de)serialize now fails with RuntimeException if schema deserialization fails
- remove kafka code
- not really tested, and I'd rather tackle connector support in a follow-up
Functions:
- Introduced AbstractPythonUDF class for sharing RichRunction#open()/close() implementations
- PythonOutputSelector now throws FlinkRuntimeException when failing during initialization
- added generic return type to Serializationutils#deserializeObject
- added new serializers for PyBoolean/-Float/-Integer/-Long/-String
- PyObjectSerializer not properly fails when an exceptioin occurs
- improved error printing
- PythonCollector now typed to Object and properly converts non-PyObjects
- jython functions that use a collector now have Object has output type
- otherwise you would get ClassCastException if jython returns something that isn't a PyObject
PythonStreamBinder
- adjusted to follow PythonPlanBinder structure
- client-like main() exception handling
- replaced Random usage with UUID.randomUIID()
- now loads GlobalConfiguration
- local/distributed tmp dir now configurable
- introduced PythonOptions
- no longer generate plan.py but instead import it directly via the PythonInterpreter
Environment:
- Reworked static environment factory methods from PythonStreamExecutionEnvironment into a PythonEnvironmentFactory
- program main() method now accepts a PythonEnvironmentFactory
- directories are now passed properly to the environment instead of using static fields
- removed PythonEnvironmentConfig
. #registerJythonSerializers now static
Examples:
- move examples to flink-streaming-python
- change examples location in dist to examples/python/streaming
- replace ParameterTool usage with argparse
- pass arguments via run instead of constructor
- remove 'if __name__ == '__main__':' block
- remove exception wrapping around source/sink creation
- add WordCount example
Tests:
- removed 'if __name__ == '__main__':' blocks from tests since the condition is never fulfilled
- removed python TestBase class
- removed print statements from tests
- standardized test job names
- cleaned up PythonStreamBinderTest / made it more consistent with PythonPlanBinderTest
- run_all_tests improvements
- stop after first failure
- print stacktrace on failure
- no longer relies on dirname() to get cwd but uses the module file location instead
- added log4j properties file
- added end-to-end test
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/zentol/flink review5886
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/5333.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #5333
----
commit a19c960b0d888926eac21287b1363e9cc3b77ce4
Author: Zohar Mizrahi <zo...@...>
Date: 2016-11-15T12:46:36Z
[FLINK-5886] Python API for streaming applications
commit 82e92c8b4898c224f8ea7ea9333056a971a0a3f4
Author: Zohar Mizrahi <zo...@...>
Date: 2017-05-09T06:28:39Z
[FLINK-5886] Improve 'fibonacci' example
1) Descrease the number of source samples, so the program finishes earlier
2) Add support for 'local' and 'cluster' execution modes (determined by
input argument):
'cluster' (default) - pyflink-stream.sh fibonacci.py
'local' - pyflink-stream.sh fibonacci.py - --local
commit fae67be3b9baf1028dc3d3c99b5d949e65fac813
Author: Zohar Mizrahi <zo...@...>
Date: 2017-05-22T22:52:55Z
[FLINK-5886] Apply fixes following a pull request review
commit b528ec17124528396d235b244511813bd7e099c2
Author: Zohar Mizrahi <zo...@...>
Date: 2017-05-23T17:36:00Z
[FLINK-5886] Apply additional fix following code review
commit 801d60a7b04b9ef6d6899ffc2ba08a73ff123cab
Author: zentol <ch...@...>
Date: 2018-01-15T12:03:42Z
Various refactorings
Changelog:
General:
- rebase branch to current master
- incremented version to 1.5-SNAPSHOT
- fixed kafka-connector dependency declaration
- set to provided
- scala version set to scala.binary.version
- flink version set to project.version
- applied checkstyle
- disabled method/parameter name rules for API classes
- assigned flink-python-streaming to 'libraries' travis profile
- copy streaming-python jar to /opt
- change the name of the final jar to flink-streaming-python (previously flink-python)
- replace maven-jar-plugin with maven-shade-plugin
API:
- PDS#map()/flat_map() now return PythonSingleOutputStreamOperator
- renamed PDS#print() to PDS#output()
- print is a keyword in python and thus not usable in native python APIs
- added PythonSingleOutputStreamOperator#name()
- removed env#execute methods that accepted local execution argument as they are redundant due to environment factory methods
- narrow visibility of *DataStream constructors
Moved/Renamed:
- made SerializerMap top-level class and renamed it to AdapterMap
- Moved UtilityFunctions#adapt to AdapterMap class
- renamed UtilityFunctions to InterpreterUtils
- moved PythonobjectInputStream2 to SerializationUtils
- renamed PythonObjectInputStream2 to SerialVersionOverridingPythonObjectInputStream
Jython:
- renamed InterpreterUtils#smartFunctionDeserialization to deserializeFunction
- added generic return type to #deserializeFunction
- #deserializeFunction uses static initialization flag to detect whether it has to load jython instead of waiting for exception to happen
- removed file cleanup in #initAndExecPythonScript as it is the binders' responsibility
Connectors:
- replaced usage of deprecated serialiation schema interfaces
- P(S/D)Schema#(de)serialize now fails with RuntimeException if schema deserialization fails
Functions:
- Introduced AbstractPythonUDF class for sharing RichRunction#open()/close() implementations
- PythonOutputSelector now throws FlinkRuntimeException when failing during initialization
- added generic return type to Serializationutils#deserializeObject
- added new serializers for PyBoolean/-Float/-Integer/-Long/-String
- PyObjectSerializer not properly fails when an exceptioin occurs
- improved error printing
- PythonCollector now typed to Object and properly converts non-PyObjects
- jython functions that use a collector now have Object has output type
- otherwise you would get ClassCastException if jython returns something that isn't a PyObject
PythonStreamBinder
- adjusted to follow PythonPlanBinder structure
- client-like main() exception handling
- replaced Random usage with UUID.randomUIID()
- now loads GlobalConfiguration
- local/distributed tmp dir now configurable
- introduced PythonOptions
- no longer generate plan.py but instead import it directly via the PythonInterpreter
Environment:
- Reworked static environment factory methods from PythonStreamExecutionEnvironment into a PythonEnvironmentFactory
- program main() method now accepts a PythonEnvironmentFactory
- directories are now passed properly to the environment instead of using static fields
- removed PythonEnvironmentConfig
. #registerJythonSerializers now static
Examples:
- move examples to flink-streaming-python
- change examples location in dist to examples/python/streaming
- replace ParameterTool usage with argparse
- pass arguments via run instead of constructor
- remove 'if __name__ == '__main__':' block
- remove exception wrapping around source/sink creation
- add WordCount example
Tests:
- removed 'if __name__ == '__main__':' blocks from tests since the condition is never fulfilled
- removed python TestBase class
- removed print statements from tests
- standardized test job names
- cleaned up PythonStreamBinderTest / made it more consistent with PythonPlanBinderTest
- run_all_tests improvements
- stop after first failure
- print stacktrace on failure
- no longer relies on dirname() to get cwd but uses the module file location instead
- added log4j properties file
- added end-to-end test
commit d928e02b62bed798882bc7cb33badb5fab78f71c
Author: zentol <ch...@...>
Date: 2018-01-22T12:20:18Z
remove kafka code
----
---
[GitHub] flink issue #5333: [FLINK-5886] Python API for streaming applications
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:
https://github.com/apache/flink/pull/5333
merging.
---
[GitHub] flink issue #5333: [FLINK-5886] Python API for streaming applications
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:
https://github.com/apache/flink/pull/5333
@zohar-mizrahi FYI
---
[GitHub] flink issue #5333: [FLINK-5886] Python API for streaming applications
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:
https://github.com/apache/flink/pull/5333
Yup, running a modified script works properly without conflicts.
---
[GitHub] flink issue #5333: [FLINK-5886] Python API for streaming applications
Posted by zohar-mizrahi <gi...@git.apache.org>.
Github user zohar-mizrahi commented on the issue:
https://github.com/apache/flink/pull/5333
Just started to look at your changes and have one comment with respect to the plan.py - have you tried executing the same script twice, but on the second time change one line in the script (.e.g a map function)? Make sure the change takes place for the second run.
(BTW - I'm not sure I'll be able to spend much time in the near future in reviewing the whole changes)
---
[GitHub] flink pull request #5333: [FLINK-5886] Python API for streaming applications
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5333#discussion_r163537435
--- Diff: flink-dist/src/main/assemblies/bin.xml ---
@@ -28,7 +28,7 @@ under the License.
<includeBaseDirectory>true</includeBaseDirectory>
<baseDirectory>flink-${project.version}</baseDirectory>
- <!-- Include flink-python.jar in lib/ -->
+ <!-- Include flink-python.jar & flink-streaming-python.jar in lib/ -->
--- End diff --
this should be reverted
---
[GitHub] flink issue #5333: [FLINK-5886] Python API for streaming applications
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:
https://github.com/apache/flink/pull/5333
I'll double check to make sure it works.
---
[GitHub] flink pull request #5333: [FLINK-5886] Python API for streaming applications
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/5333
---
[GitHub] flink issue #5333: [FLINK-5886] Python API for streaming applications
Posted by zentol <gi...@git.apache.org>.
Github user zentol commented on the issue:
https://github.com/apache/flink/pull/5333
Given that each task has a separate jython instance (separate by a classloader) there shouldn't be any conflicts though.
---