You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Manas Kale <ma...@gmail.com> on 2020/08/28 12:47:42 UTC

PyFlink cluster runtime issue

Hi,
I am trying to deploy a pyFlink application on a local cluster. I am able
to run my application without any problems if I execute it as a normal
python program using the command :
python myApplication.py
My pyFlink version is __version__ = "1.11.0".
I had installed this pyFlink through conda/pip (don't remember which).

Per instructions given in [1] I have ensured that running the command
"python" gets me to a python 3.7 shell with pyFlink installed.
I have also ensured my local Flink cluster version is 1.11.0 (same as
above).
However, if I execute the application using the command:
bin/flink run -py myApplication.py

I get the error:

Traceback (most recent call last):
 File "basic_streaming_job.py", line 65, in <module>
   main()
 File "basic_streaming_job.py", line 43, in main
   """)
 File
"/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/pyflink.zip/pyflink/table/
table_environment.py", line 543, in execute_sql
 File
"/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/py4j-0.10.8.1-src.zip/py4j
/java_gateway.py", line 1286, in __call__
 File
"/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/pyflink.zip/pyflink/util/e
xceptions.py", line 147, in deco
 File
"/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/py4j-0.10.8.1-src.zip/py4j
/protocol.py", line 328, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o5.executeSql.
: org.apache.flink.table.api.ValidationException: Unable to create a source
for reading table
'default_catalog.default_database.raw_message'.

Table options are:

'connector'='kafka'
'format'='json'
'properties.bootstrap.servers'='localhost:9092'
'topic'='basic_features_normalized'
       at
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:12
5)
       at
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(Catal
ogSourceTable.scala:135)
.....

The offending table schema in question :

CREATE TABLE {INPUT_TABLE} (
    monitorId STRING,
    deviceId STRING,
    state INT,
    feature_1 DOUBLE,
    feature_2 DOUBLE,
    feature_3 DOUBLE,
    feature_4 DOUBLE,
    feature_5 DOUBLE,
    feature_6 DOUBLE,
    time_str TIMESTAMP(3),
    WATERMARK FOR time_str AS time_str - INTERVAL '{WATERMARK_DELAY}' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = '{INPUT_TOPIC}',
    'properties.bootstrap.servers' = '{KAFKA}',
    'format' = 'json'
)

Clearly, even though my standalone pyFlink version and cluster Flink
versions are the same, something is different with the cluster runtime.
What could that be?


[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html#job-submission-examples

Re: PyFlink cluster runtime issue

Posted by Manas Kale <ma...@gmail.com>.
Ok, thank you!

On Sat, 29 Aug, 2020, 4:07 pm Xingbo Huang, <hx...@gmail.com> wrote:

> Hi Manas,
>
> We can't submit a pyflink job through flink web currently. The only way
> currently to submit a pyFlink job is through the command line.
>
> Best,
> Xingbo
>
> Manas Kale <ma...@gmail.com> 于2020年8月29日周六 下午12:51写道:
>
>> Hi Xingbo,
>> Thanks, that worked. Just to make sure, the only way currently to submit
>> a pyFlink job is through the command line right? Can I do that through the
>> GUI?
>>
>> On Fri, Aug 28, 2020 at 8:17 PM Xingbo Huang <hx...@gmail.com> wrote:
>>
>>> Hi Manas,
>>>
>>> I think you forgot to add kafka jar[1] dependency. You can use the
>>> argument -j of the command line[2] or the Python Table API to specify the
>>> jar. For details about the APIs of adding Java dependency, you can refer to
>>> the relevant documentation[3]
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#dependencies
>>> [2]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/cli.html#job-submission-examples
>>> [3]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/dependency_management.html#java-dependency
>>>
>>> Best,
>>> Xingbo
>>>
>>> Manas Kale <ma...@gmail.com> 于2020年8月28日周五 下午9:06写道:
>>>
>>>> Hi,
>>>> I am trying to deploy a pyFlink application on a local cluster. I am
>>>> able to run my application without any problems if I execute it as a normal
>>>> python program using the command :
>>>> python myApplication.py
>>>> My pyFlink version is __version__ = "1.11.0".
>>>> I had installed this pyFlink through conda/pip (don't remember which).
>>>>
>>>> Per instructions given in [1] I have ensured that running the command
>>>> "python" gets me to a python 3.7 shell with pyFlink installed.
>>>> I have also ensured my local Flink cluster version is 1.11.0 (same as
>>>> above).
>>>> However, if I execute the application using the command:
>>>> bin/flink run -py myApplication.py
>>>>
>>>> I get the error:
>>>>
>>>> Traceback (most recent call last):
>>>>  File "basic_streaming_job.py", line 65, in <module>
>>>>    main()
>>>>  File "basic_streaming_job.py", line 43, in main
>>>>    """)
>>>>  File
>>>> "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/pyflink.zip/pyflink/table/
>>>> table_environment.py", line 543, in execute_sql
>>>>  File
>>>> "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/py4j-0.10.8.1-src.zip/py4j
>>>> /java_gateway.py", line 1286, in __call__
>>>>  File
>>>> "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/pyflink.zip/pyflink/util/e
>>>> xceptions.py", line 147, in deco
>>>>  File
>>>> "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/py4j-0.10.8.1-src.zip/py4j
>>>> /protocol.py", line 328, in get_return_value
>>>> py4j.protocol.Py4JJavaError: An error occurred while calling
>>>> o5.executeSql.
>>>> : org.apache.flink.table.api.ValidationException: Unable to create a
>>>> source for reading table
>>>> 'default_catalog.default_database.raw_message'.
>>>>
>>>> Table options are:
>>>>
>>>> 'connector'='kafka'
>>>> 'format'='json'
>>>> 'properties.bootstrap.servers'='localhost:9092'
>>>> 'topic'='basic_features_normalized'
>>>>        at
>>>> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:12
>>>> 5)
>>>>        at
>>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(Catal
>>>> ogSourceTable.scala:135)
>>>> .....
>>>>
>>>> The offending table schema in question :
>>>>
>>>> CREATE TABLE {INPUT_TABLE} (
>>>>     monitorId STRING,
>>>>     deviceId STRING,
>>>>     state INT,
>>>>     feature_1 DOUBLE,
>>>>     feature_2 DOUBLE,
>>>>     feature_3 DOUBLE,
>>>>     feature_4 DOUBLE,
>>>>     feature_5 DOUBLE,
>>>>     feature_6 DOUBLE,
>>>>     time_str TIMESTAMP(3),
>>>>     WATERMARK FOR time_str AS time_str - INTERVAL '{WATERMARK_DELAY}' SECOND
>>>> ) WITH (
>>>>     'connector' = 'kafka',
>>>>     'topic' = '{INPUT_TOPIC}',
>>>>     'properties.bootstrap.servers' = '{KAFKA}',
>>>>     'format' = 'json'
>>>> )
>>>>
>>>> Clearly, even though my standalone pyFlink version and cluster Flink
>>>> versions are the same, something is different with the cluster runtime.
>>>> What could that be?
>>>>
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html#job-submission-examples
>>>>
>>>

Re: PyFlink cluster runtime issue

Posted by Xingbo Huang <hx...@gmail.com>.
Hi Manas,

We can't submit a pyflink job through flink web currently. The only way
currently to submit a pyFlink job is through the command line.

Best,
Xingbo

Manas Kale <ma...@gmail.com> 于2020年8月29日周六 下午12:51写道:

> Hi Xingbo,
> Thanks, that worked. Just to make sure, the only way currently to submit a
> pyFlink job is through the command line right? Can I do that through the
> GUI?
>
> On Fri, Aug 28, 2020 at 8:17 PM Xingbo Huang <hx...@gmail.com> wrote:
>
>> Hi Manas,
>>
>> I think you forgot to add kafka jar[1] dependency. You can use the
>> argument -j of the command line[2] or the Python Table API to specify the
>> jar. For details about the APIs of adding Java dependency, you can refer to
>> the relevant documentation[3]
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#dependencies
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/cli.html#job-submission-examples
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/dependency_management.html#java-dependency
>>
>> Best,
>> Xingbo
>>
>> Manas Kale <ma...@gmail.com> 于2020年8月28日周五 下午9:06写道:
>>
>>> Hi,
>>> I am trying to deploy a pyFlink application on a local cluster. I am
>>> able to run my application without any problems if I execute it as a normal
>>> python program using the command :
>>> python myApplication.py
>>> My pyFlink version is __version__ = "1.11.0".
>>> I had installed this pyFlink through conda/pip (don't remember which).
>>>
>>> Per instructions given in [1] I have ensured that running the command
>>> "python" gets me to a python 3.7 shell with pyFlink installed.
>>> I have also ensured my local Flink cluster version is 1.11.0 (same as
>>> above).
>>> However, if I execute the application using the command:
>>> bin/flink run -py myApplication.py
>>>
>>> I get the error:
>>>
>>> Traceback (most recent call last):
>>>  File "basic_streaming_job.py", line 65, in <module>
>>>    main()
>>>  File "basic_streaming_job.py", line 43, in main
>>>    """)
>>>  File
>>> "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/pyflink.zip/pyflink/table/
>>> table_environment.py", line 543, in execute_sql
>>>  File
>>> "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/py4j-0.10.8.1-src.zip/py4j
>>> /java_gateway.py", line 1286, in __call__
>>>  File
>>> "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/pyflink.zip/pyflink/util/e
>>> xceptions.py", line 147, in deco
>>>  File
>>> "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/py4j-0.10.8.1-src.zip/py4j
>>> /protocol.py", line 328, in get_return_value
>>> py4j.protocol.Py4JJavaError: An error occurred while calling
>>> o5.executeSql.
>>> : org.apache.flink.table.api.ValidationException: Unable to create a
>>> source for reading table
>>> 'default_catalog.default_database.raw_message'.
>>>
>>> Table options are:
>>>
>>> 'connector'='kafka'
>>> 'format'='json'
>>> 'properties.bootstrap.servers'='localhost:9092'
>>> 'topic'='basic_features_normalized'
>>>        at
>>> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:12
>>> 5)
>>>        at
>>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(Catal
>>> ogSourceTable.scala:135)
>>> .....
>>>
>>> The offending table schema in question :
>>>
>>> CREATE TABLE {INPUT_TABLE} (
>>>     monitorId STRING,
>>>     deviceId STRING,
>>>     state INT,
>>>     feature_1 DOUBLE,
>>>     feature_2 DOUBLE,
>>>     feature_3 DOUBLE,
>>>     feature_4 DOUBLE,
>>>     feature_5 DOUBLE,
>>>     feature_6 DOUBLE,
>>>     time_str TIMESTAMP(3),
>>>     WATERMARK FOR time_str AS time_str - INTERVAL '{WATERMARK_DELAY}' SECOND
>>> ) WITH (
>>>     'connector' = 'kafka',
>>>     'topic' = '{INPUT_TOPIC}',
>>>     'properties.bootstrap.servers' = '{KAFKA}',
>>>     'format' = 'json'
>>> )
>>>
>>> Clearly, even though my standalone pyFlink version and cluster Flink
>>> versions are the same, something is different with the cluster runtime.
>>> What could that be?
>>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html#job-submission-examples
>>>
>>

Re: PyFlink cluster runtime issue

Posted by Manas Kale <ma...@gmail.com>.
Hi Xingbo,
Thanks, that worked. Just to make sure, the only way currently to submit a
pyFlink job is through the command line right? Can I do that through the
GUI?

On Fri, Aug 28, 2020 at 8:17 PM Xingbo Huang <hx...@gmail.com> wrote:

> Hi Manas,
>
> I think you forgot to add kafka jar[1] dependency. You can use the
> argument -j of the command line[2] or the Python Table API to specify the
> jar. For details about the APIs of adding Java dependency, you can refer to
> the relevant documentation[3]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#dependencies
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/cli.html#job-submission-examples
> [3]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/dependency_management.html#java-dependency
>
> Best,
> Xingbo
>
> Manas Kale <ma...@gmail.com> 于2020年8月28日周五 下午9:06写道:
>
>> Hi,
>> I am trying to deploy a pyFlink application on a local cluster. I am able
>> to run my application without any problems if I execute it as a normal
>> python program using the command :
>> python myApplication.py
>> My pyFlink version is __version__ = "1.11.0".
>> I had installed this pyFlink through conda/pip (don't remember which).
>>
>> Per instructions given in [1] I have ensured that running the command
>> "python" gets me to a python 3.7 shell with pyFlink installed.
>> I have also ensured my local Flink cluster version is 1.11.0 (same as
>> above).
>> However, if I execute the application using the command:
>> bin/flink run -py myApplication.py
>>
>> I get the error:
>>
>> Traceback (most recent call last):
>>  File "basic_streaming_job.py", line 65, in <module>
>>    main()
>>  File "basic_streaming_job.py", line 43, in main
>>    """)
>>  File
>> "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/pyflink.zip/pyflink/table/
>> table_environment.py", line 543, in execute_sql
>>  File
>> "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/py4j-0.10.8.1-src.zip/py4j
>> /java_gateway.py", line 1286, in __call__
>>  File
>> "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/pyflink.zip/pyflink/util/e
>> xceptions.py", line 147, in deco
>>  File
>> "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/py4j-0.10.8.1-src.zip/py4j
>> /protocol.py", line 328, in get_return_value
>> py4j.protocol.Py4JJavaError: An error occurred while calling
>> o5.executeSql.
>> : org.apache.flink.table.api.ValidationException: Unable to create a
>> source for reading table
>> 'default_catalog.default_database.raw_message'.
>>
>> Table options are:
>>
>> 'connector'='kafka'
>> 'format'='json'
>> 'properties.bootstrap.servers'='localhost:9092'
>> 'topic'='basic_features_normalized'
>>        at
>> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:12
>> 5)
>>        at
>> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(Catal
>> ogSourceTable.scala:135)
>> .....
>>
>> The offending table schema in question :
>>
>> CREATE TABLE {INPUT_TABLE} (
>>     monitorId STRING,
>>     deviceId STRING,
>>     state INT,
>>     feature_1 DOUBLE,
>>     feature_2 DOUBLE,
>>     feature_3 DOUBLE,
>>     feature_4 DOUBLE,
>>     feature_5 DOUBLE,
>>     feature_6 DOUBLE,
>>     time_str TIMESTAMP(3),
>>     WATERMARK FOR time_str AS time_str - INTERVAL '{WATERMARK_DELAY}' SECOND
>> ) WITH (
>>     'connector' = 'kafka',
>>     'topic' = '{INPUT_TOPIC}',
>>     'properties.bootstrap.servers' = '{KAFKA}',
>>     'format' = 'json'
>> )
>>
>> Clearly, even though my standalone pyFlink version and cluster Flink
>> versions are the same, something is different with the cluster runtime.
>> What could that be?
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html#job-submission-examples
>>
>

Re: PyFlink cluster runtime issue

Posted by Xingbo Huang <hx...@gmail.com>.
Hi Manas,

I think you forgot to add kafka jar[1] dependency. You can use the argument
-j of the command line[2] or the Python Table API to specify the jar. For
details about the APIs of adding Java dependency, you can refer to the
relevant documentation[3]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/kafka.html#dependencies
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/cli.html#job-submission-examples
[3]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/user-guide/table/dependency_management.html#java-dependency

Best,
Xingbo

Manas Kale <ma...@gmail.com> 于2020年8月28日周五 下午9:06写道:

> Hi,
> I am trying to deploy a pyFlink application on a local cluster. I am able
> to run my application without any problems if I execute it as a normal
> python program using the command :
> python myApplication.py
> My pyFlink version is __version__ = "1.11.0".
> I had installed this pyFlink through conda/pip (don't remember which).
>
> Per instructions given in [1] I have ensured that running the command
> "python" gets me to a python 3.7 shell with pyFlink installed.
> I have also ensured my local Flink cluster version is 1.11.0 (same as
> above).
> However, if I execute the application using the command:
> bin/flink run -py myApplication.py
>
> I get the error:
>
> Traceback (most recent call last):
>  File "basic_streaming_job.py", line 65, in <module>
>    main()
>  File "basic_streaming_job.py", line 43, in main
>    """)
>  File
> "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/pyflink.zip/pyflink/table/
> table_environment.py", line 543, in execute_sql
>  File
> "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/py4j-0.10.8.1-src.zip/py4j
> /java_gateway.py", line 1286, in __call__
>  File
> "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/pyflink.zip/pyflink/util/e
> xceptions.py", line 147, in deco
>  File
> "/home/manas/IU_workspace/Flink_POC/flink-1.11.0/opt/python/py4j-0.10.8.1-src.zip/py4j
> /protocol.py", line 328, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling
> o5.executeSql.
> : org.apache.flink.table.api.ValidationException: Unable to create a
> source for reading table
> 'default_catalog.default_database.raw_message'.
>
> Table options are:
>
> 'connector'='kafka'
> 'format'='json'
> 'properties.bootstrap.servers'='localhost:9092'
> 'topic'='basic_features_normalized'
>        at
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:12
> 5)
>        at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(Catal
> ogSourceTable.scala:135)
> .....
>
> The offending table schema in question :
>
> CREATE TABLE {INPUT_TABLE} (
>     monitorId STRING,
>     deviceId STRING,
>     state INT,
>     feature_1 DOUBLE,
>     feature_2 DOUBLE,
>     feature_3 DOUBLE,
>     feature_4 DOUBLE,
>     feature_5 DOUBLE,
>     feature_6 DOUBLE,
>     time_str TIMESTAMP(3),
>     WATERMARK FOR time_str AS time_str - INTERVAL '{WATERMARK_DELAY}' SECOND
> ) WITH (
>     'connector' = 'kafka',
>     'topic' = '{INPUT_TOPIC}',
>     'properties.bootstrap.servers' = '{KAFKA}',
>     'format' = 'json'
> )
>
> Clearly, even though my standalone pyFlink version and cluster Flink
> versions are the same, something is different with the cluster runtime.
> What could that be?
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html#job-submission-examples
>