You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Yik San Chan <ev...@gmail.com> on 2021/03/15 05:25:46 UTC

Got “pyflink.util.exceptions.TableException: findAndCreateTableSource failed.” when running PyFlink example

(The question is cross-posted on StackOverflow
https://stackoverflow.com/questions/66632765/got-pyflink-util-exceptions-tableexception-findandcreatetablesource-failed-w
)

I am running below PyFlink program (copied from
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table_api_tutorial.html
)

```python
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.expressions import lit

exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)

t_env.connect(FileSystem().path('/tmp/input')) \
    .with_format(OldCsv()
                 .field('word', DataTypes.STRING())) \
    .with_schema(Schema()
                 .field('word', DataTypes.STRING())) \
    .create_temporary_table('mySource')

t_env.connect(FileSystem().path('/tmp/output')) \
    .with_format(OldCsv()
                 .field_delimiter('\t')
                 .field('word', DataTypes.STRING())
                 .field('count', DataTypes.BIGINT())) \
    .with_schema(Schema()
                 .field('word', DataTypes.STRING())
                 .field('count', DataTypes.BIGINT())) \
    .create_temporary_table('mySink')

tab = t_env.from_path('mySource')
tab.group_by(tab.word) \
   .select(tab.word, lit(1).count) \
   .execute_insert('mySink').wait()
```

To verify it works, I did the following in order:

1. Run `echo -e  "flink\npyflink\nflink" > /tmp/input`
1. Run `python WordCount.py`
1. Run `cat /tmp/out` and find expected output

Then I changed my PyFlink program a bit to prefer SQL over Table API, but I
find it doesn't work.

```python
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
from pyflink.table.descriptors import Schema, OldCsv, FileSystem
from pyflink.table.expressions import lit

exec_env = ExecutionEnvironment.get_execution_environment()
exec_env.set_parallelism(1)
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env, t_config)

my_source_ddl = """
    create table mySource (
        word VARCHAR
    ) with (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = '/tmp/input'
    )
"""

my_sink_ddl = """
    create table mySink (
        word VARCHAR,
        `count` BIGINT
    ) with (
        'connector' = 'filesystem',
        'format' = 'csv',
        'path' = '/tmp/output'
    )
"""

t_env.sql_update(my_source_ddl)
t_env.sql_update(my_sink_ddl)

tab = t_env.from_path('mySource')
tab.group_by(tab.word) \
   .select(tab.word, lit(1).count) \
   .execute_insert('mySink').wait()
```

Here's the error:

```
Traceback (most recent call last):
  File "WordCount.py", line 38, in <module>
    .execute_insert('mySink').wait()
  File
"/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/table/table.py",
line 864, in execute_insert
    return TableResult(self._j_table.executeInsert(table_path, overwrite))
  File
"/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_gateway.py",
line 1286, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File
"/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/util/exceptions.py",
line 162, in deco
    raise java_exception
pyflink.util.exceptions.TableException: findAndCreateTableSink failed.
at
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSink(TableFactoryUtil.java:87)
at
org.apache.flink.table.api.internal.TableEnvImpl.getTableSink(TableEnvImpl.scala:1097)
at org.apache.flink.table.api.internal.TableEnvImpl.org
$apache$flink$table$api$internal$TableEnvImpl$$writeToSinkAndTranslate(TableEnvImpl.scala:929)
at
org.apache.flink.table.api.internal.TableEnvImpl$$anonfun$1.apply(TableEnvImpl.scala:556)
at
org.apache.flink.table.api.internal.TableEnvImpl$$anonfun$1.apply(TableEnvImpl.scala:554)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at
org.apache.flink.table.api.internal.TableEnvImpl.executeInternal(TableEnvImpl.scala:554)
at
org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at
org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at
org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at
org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at
org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
```

I wonder what's wrong with my new program? Thanks!

Re: Got “pyflink.util.exceptions.TableException: findAndCreateTableSource failed.” when running PyFlink example

Posted by Yik San Chan <ev...@gmail.com>.
Thanks for your help, it works.

Best,
Yik San Chan

On Tue, Mar 16, 2021 at 10:03 AM Xingbo Huang <hx...@gmail.com> wrote:

> Hi,
>
> The problem is that the legacy DataSet you are using does not support the
> FileSystem connector you declared. You can use blink Planner to achieve
> your needs.
>
> >>>
>     t_env = BatchTableEnvironment.create(
>         environment_settings=EnvironmentSettings.new_instance()
>         .in_batch_mode().use_blink_planner().build())
>     t_env._j_tenv.getPlanner().getExecEnv().setParallelism(1)
>
>     my_source_ddl = """
>         create table mySource (
>             word VARCHAR
>         ) with (
>             'connector' = 'filesystem',
>             'format' = 'csv',
>             'path' = '/tmp/input'
>         )
>     """
>
>     my_sink_ddl = """
>         create table mySink (
>             word VARCHAR,
>             `count` BIGINT
>         ) with (
>             'connector' = 'filesystem',
>             'format' = 'csv',
>             'path' = '/tmp/output'
>         )
>     """
>
>     t_env.execute_sql(my_source_ddl)
>     t_env.execute_sql(my_sink_ddl)
>
>     tab = t_env.from_path('mySource')
>     tab.group_by(tab.word) \
>         .select(tab.word, lit(1).count) \
>         .execute_insert('mySink').wait()
> >>>
>
> Best,
> Xingbo
>
> Yik San Chan <ev...@gmail.com> 于2021年3月15日周一 下午1:26写道:
>
>> (The question is cross-posted on StackOverflow
>> https://stackoverflow.com/questions/66632765/got-pyflink-util-exceptions-tableexception-findandcreatetablesource-failed-w
>> )
>>
>> I am running below PyFlink program (copied from
>> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table_api_tutorial.html
>> )
>>
>> ```python
>> from pyflink.dataset import ExecutionEnvironment
>> from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
>> from pyflink.table.descriptors import Schema, OldCsv, FileSystem
>> from pyflink.table.expressions import lit
>>
>> exec_env = ExecutionEnvironment.get_execution_environment()
>> exec_env.set_parallelism(1)
>> t_config = TableConfig()
>> t_env = BatchTableEnvironment.create(exec_env, t_config)
>>
>> t_env.connect(FileSystem().path('/tmp/input')) \
>>     .with_format(OldCsv()
>>                  .field('word', DataTypes.STRING())) \
>>     .with_schema(Schema()
>>                  .field('word', DataTypes.STRING())) \
>>     .create_temporary_table('mySource')
>>
>> t_env.connect(FileSystem().path('/tmp/output')) \
>>     .with_format(OldCsv()
>>                  .field_delimiter('\t')
>>                  .field('word', DataTypes.STRING())
>>                  .field('count', DataTypes.BIGINT())) \
>>     .with_schema(Schema()
>>                  .field('word', DataTypes.STRING())
>>                  .field('count', DataTypes.BIGINT())) \
>>     .create_temporary_table('mySink')
>>
>> tab = t_env.from_path('mySource')
>> tab.group_by(tab.word) \
>>    .select(tab.word, lit(1).count) \
>>    .execute_insert('mySink').wait()
>> ```
>>
>> To verify it works, I did the following in order:
>>
>> 1. Run `echo -e  "flink\npyflink\nflink" > /tmp/input`
>> 1. Run `python WordCount.py`
>> 1. Run `cat /tmp/out` and find expected output
>>
>> Then I changed my PyFlink program a bit to prefer SQL over Table API, but
>> I find it doesn't work.
>>
>> ```python
>> from pyflink.dataset import ExecutionEnvironment
>> from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
>> from pyflink.table.descriptors import Schema, OldCsv, FileSystem
>> from pyflink.table.expressions import lit
>>
>> exec_env = ExecutionEnvironment.get_execution_environment()
>> exec_env.set_parallelism(1)
>> t_config = TableConfig()
>> t_env = BatchTableEnvironment.create(exec_env, t_config)
>>
>> my_source_ddl = """
>>     create table mySource (
>>         word VARCHAR
>>     ) with (
>>         'connector' = 'filesystem',
>>         'format' = 'csv',
>>         'path' = '/tmp/input'
>>     )
>> """
>>
>> my_sink_ddl = """
>>     create table mySink (
>>         word VARCHAR,
>>         `count` BIGINT
>>     ) with (
>>         'connector' = 'filesystem',
>>         'format' = 'csv',
>>         'path' = '/tmp/output'
>>     )
>> """
>>
>> t_env.sql_update(my_source_ddl)
>> t_env.sql_update(my_sink_ddl)
>>
>> tab = t_env.from_path('mySource')
>> tab.group_by(tab.word) \
>>    .select(tab.word, lit(1).count) \
>>    .execute_insert('mySink').wait()
>> ```
>>
>> Here's the error:
>>
>> ```
>> Traceback (most recent call last):
>>   File "WordCount.py", line 38, in <module>
>>     .execute_insert('mySink').wait()
>>   File
>> "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/table/table.py",
>> line 864, in execute_insert
>>     return TableResult(self._j_table.executeInsert(table_path, overwrite))
>>   File
>> "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_gateway.py",
>> line 1286, in __call__
>>     answer, self.gateway_client, self.target_id, self.name)
>>   File
>> "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/util/exceptions.py",
>> line 162, in deco
>>     raise java_exception
>> pyflink.util.exceptions.TableException: findAndCreateTableSink failed.
>> at
>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSink(TableFactoryUtil.java:87)
>> at
>> org.apache.flink.table.api.internal.TableEnvImpl.getTableSink(TableEnvImpl.scala:1097)
>> at org.apache.flink.table.api.internal.TableEnvImpl.org
>> $apache$flink$table$api$internal$TableEnvImpl$$writeToSinkAndTranslate(TableEnvImpl.scala:929)
>> at
>> org.apache.flink.table.api.internal.TableEnvImpl$$anonfun$1.apply(TableEnvImpl.scala:556)
>> at
>> org.apache.flink.table.api.internal.TableEnvImpl$$anonfun$1.apply(TableEnvImpl.scala:554)
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>> at
>> org.apache.flink.table.api.internal.TableEnvImpl.executeInternal(TableEnvImpl.scala:554)
>> at
>> org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
>> at
>> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
>> at
>> org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
>> at
>> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
>> at
>> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
>> at
>> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
>> at java.lang.Thread.run(Thread.java:748)
>> ```
>>
>> I wonder what's wrong with my new program? Thanks!
>>
>

Re: Got “pyflink.util.exceptions.TableException: findAndCreateTableSource failed.” when running PyFlink example

Posted by Xingbo Huang <hx...@gmail.com>.
Hi,

The problem is that the legacy DataSet you are using does not support the
FileSystem connector you declared. You can use blink Planner to achieve
your needs.

>>>
    t_env = BatchTableEnvironment.create(
        environment_settings=EnvironmentSettings.new_instance()
        .in_batch_mode().use_blink_planner().build())
    t_env._j_tenv.getPlanner().getExecEnv().setParallelism(1)

    my_source_ddl = """
        create table mySource (
            word VARCHAR
        ) with (
            'connector' = 'filesystem',
            'format' = 'csv',
            'path' = '/tmp/input'
        )
    """

    my_sink_ddl = """
        create table mySink (
            word VARCHAR,
            `count` BIGINT
        ) with (
            'connector' = 'filesystem',
            'format' = 'csv',
            'path' = '/tmp/output'
        )
    """

    t_env.execute_sql(my_source_ddl)
    t_env.execute_sql(my_sink_ddl)

    tab = t_env.from_path('mySource')
    tab.group_by(tab.word) \
        .select(tab.word, lit(1).count) \
        .execute_insert('mySink').wait()
>>>

Best,
Xingbo

Yik San Chan <ev...@gmail.com> 于2021年3月15日周一 下午1:26写道:

> (The question is cross-posted on StackOverflow
> https://stackoverflow.com/questions/66632765/got-pyflink-util-exceptions-tableexception-findandcreatetablesource-failed-w
> )
>
> I am running below PyFlink program (copied from
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table_api_tutorial.html
> )
>
> ```python
> from pyflink.dataset import ExecutionEnvironment
> from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
> from pyflink.table.descriptors import Schema, OldCsv, FileSystem
> from pyflink.table.expressions import lit
>
> exec_env = ExecutionEnvironment.get_execution_environment()
> exec_env.set_parallelism(1)
> t_config = TableConfig()
> t_env = BatchTableEnvironment.create(exec_env, t_config)
>
> t_env.connect(FileSystem().path('/tmp/input')) \
>     .with_format(OldCsv()
>                  .field('word', DataTypes.STRING())) \
>     .with_schema(Schema()
>                  .field('word', DataTypes.STRING())) \
>     .create_temporary_table('mySource')
>
> t_env.connect(FileSystem().path('/tmp/output')) \
>     .with_format(OldCsv()
>                  .field_delimiter('\t')
>                  .field('word', DataTypes.STRING())
>                  .field('count', DataTypes.BIGINT())) \
>     .with_schema(Schema()
>                  .field('word', DataTypes.STRING())
>                  .field('count', DataTypes.BIGINT())) \
>     .create_temporary_table('mySink')
>
> tab = t_env.from_path('mySource')
> tab.group_by(tab.word) \
>    .select(tab.word, lit(1).count) \
>    .execute_insert('mySink').wait()
> ```
>
> To verify it works, I did the following in order:
>
> 1. Run `echo -e  "flink\npyflink\nflink" > /tmp/input`
> 1. Run `python WordCount.py`
> 1. Run `cat /tmp/out` and find expected output
>
> Then I changed my PyFlink program a bit to prefer SQL over Table API, but
> I find it doesn't work.
>
> ```python
> from pyflink.dataset import ExecutionEnvironment
> from pyflink.table import TableConfig, DataTypes, BatchTableEnvironment
> from pyflink.table.descriptors import Schema, OldCsv, FileSystem
> from pyflink.table.expressions import lit
>
> exec_env = ExecutionEnvironment.get_execution_environment()
> exec_env.set_parallelism(1)
> t_config = TableConfig()
> t_env = BatchTableEnvironment.create(exec_env, t_config)
>
> my_source_ddl = """
>     create table mySource (
>         word VARCHAR
>     ) with (
>         'connector' = 'filesystem',
>         'format' = 'csv',
>         'path' = '/tmp/input'
>     )
> """
>
> my_sink_ddl = """
>     create table mySink (
>         word VARCHAR,
>         `count` BIGINT
>     ) with (
>         'connector' = 'filesystem',
>         'format' = 'csv',
>         'path' = '/tmp/output'
>     )
> """
>
> t_env.sql_update(my_source_ddl)
> t_env.sql_update(my_sink_ddl)
>
> tab = t_env.from_path('mySource')
> tab.group_by(tab.word) \
>    .select(tab.word, lit(1).count) \
>    .execute_insert('mySink').wait()
> ```
>
> Here's the error:
>
> ```
> Traceback (most recent call last):
>   File "WordCount.py", line 38, in <module>
>     .execute_insert('mySink').wait()
>   File
> "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/table/table.py",
> line 864, in execute_insert
>     return TableResult(self._j_table.executeInsert(table_path, overwrite))
>   File
> "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/py4j/java_gateway.py",
> line 1286, in __call__
>     answer, self.gateway_client, self.target_id, self.name)
>   File
> "/usr/local/anaconda3/envs/pyflink-quickstart/lib/python3.7/site-packages/pyflink/util/exceptions.py",
> line 162, in deco
>     raise java_exception
> pyflink.util.exceptions.TableException: findAndCreateTableSink failed.
> at
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSink(TableFactoryUtil.java:87)
> at
> org.apache.flink.table.api.internal.TableEnvImpl.getTableSink(TableEnvImpl.scala:1097)
> at org.apache.flink.table.api.internal.TableEnvImpl.org
> $apache$flink$table$api$internal$TableEnvImpl$$writeToSinkAndTranslate(TableEnvImpl.scala:929)
> at
> org.apache.flink.table.api.internal.TableEnvImpl$$anonfun$1.apply(TableEnvImpl.scala:556)
> at
> org.apache.flink.table.api.internal.TableEnvImpl$$anonfun$1.apply(TableEnvImpl.scala:554)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> at scala.collection.AbstractTraversable.map(Traversable.scala:104)
> at
> org.apache.flink.table.api.internal.TableEnvImpl.executeInternal(TableEnvImpl.scala:554)
> at
> org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:572)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> at
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> at
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.lang.Thread.run(Thread.java:748)
> ```
>
> I wonder what's wrong with my new program? Thanks!
>