You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Günter Hipler <gu...@bluewin.ch> on 2020/02/03 05:00:15 UTC
no suitable table factory for file sources
Hi,
I can't read from a file source using the sql-client tool.
I just set up a simple test scenario with the configuration file in [1]
I'm getting the error in [2] starting the environment with
bin/sql-client.sh embedded -d gh/sql-client-conf.yaml -l lib
in the standard Flink 1.9.1 download environment.
Reading the documentation
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#connectors
I expected the file system connectors as "built in"
I'm getting a similar issue while following the sql-training tutorial
from https://github.com/ververica/sql-training/wiki. There I changed the
used sql-client-conf.yaml file of the docker container for the sql client.
Thanks for any hints
Günter
[1]
################################################################################
# Define table sources here. See the Table API & SQL documentation for
details.
tables:
- name: Guenter
type: source-table
update-mode: append
connector:
type: filesystem
path: file:///home/swissbib/temp/trash/hello.txt
format:
type: csv
# required: define the schema either by using type information
schema: "ROW(test STRING)"
# or use the table's schema
derive-schema: true
field-delimiter: ";" # optional: field delimiter
character (',' by default)
line-delimiter: "\r\n" # optional: line delimiter ("\n" by
default; otherwise "\r" or "\r\n" are allowed)
quote-character: "'" # optional: quote character for
enclosing field values ('"' by default)
allow-comments: true # optional: ignores comment lines
that start with "#" (disabled by default)
# if enabled, make sure to also ignore parse errors to allow
empty rows
ignore-parse-errors: true # optional: skip fields and rows
with parse errors instead of failing;
# fields are set to null in case of errors
array-element-delimiter: "|" # optional: the array element
delimiter string for separating
# array and row element values (";" by default)
escape-character: "\\" # optional: escape character for
escaping values (disabled by default)
null-literal: "n/a" # optional: null literal string that
is interpreted as a
# null value (disabled by default)
#==============================================================================
# Execution properties
#==============================================================================
# Execution properties allow for changing the behavior of a table program.
execution:
#planner: blink
type: streaming # 'batch' or 'streaming' execution
result-mode: table # 'changelog' or 'table' presentation of
results
parallelism: 1 # parallelism of the program
max-parallelism: 128 # maximum parallelism
min-idle-state-retention: 0 # minimum idle state retention in ms
max-idle-state-retention: 0 # maximum idle state retention in ms
#==============================================================================
# Deployment properties
#==============================================================================
# Deployment properties allow for describing the cluster to which table
# programs are submitted to.
deployment:
type: standalone # only the 'standalone' deployment is
supported
response-timeout: 5000 # general cluster communication timeout
in ms
gateway-address: "" # (optional) address from cluster to gateway
gateway-port: 0 # (optional) port from cluster to gateway
[2]
bin/sql-client.sh embedded -d gh/sql-client-conf.yaml -l lib
Reading default environment from:
file:/usr/local/swissbib/flink-1.9.1/gh/sql-client-conf.yaml
No session environment specified.
Validating current environment...
Exception in thread "main"
org.apache.flink.table.client.SqlClientException: The configured
environment is invalid. Please check your environment files again.
at
org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:147)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:99)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:194)
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException:
Could not create execution context.
at
org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:562)
at
org.apache.flink.table.client.gateway.local.LocalExecutor.validateSession(LocalExecutor.java:382)
at
org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:144)
... 2 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
Could not find a suitable table factory for
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.
Reason: No factory supports all properties.
The following properties are requested:
connector.path=file:///home/swissbib/temp/trash/hello.txt
connector.type=filesystem
format.allow-comments=true
format.array-element-delimiter=|
format.derive-schema=true
format.escape-character=\\
format.field-delimiter=;
format.ignore-parse-errors=true
format.line-delimiter=\r\n
format.null-literal=n/a
format.quote-character='
format.schema=ROW(test STRING)
format.type=csv
update-mode=append
The following factories have been considered:
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.table.planner.StreamPlannerFactory
org.apache.flink.table.executor.StreamExecutorFactory
org.apache.flink.table.planner.delegation.BlinkPlannerFactory
org.apache.flink.table.planner.delegation.BlinkExecutorFactory
at
org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:370)
at
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:197)
at
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144)
at
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:114)
at
org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:265)
at
org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$new$1(ExecutionContext.java:144)
at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
at
org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:142)
at
org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:558)
... 4 more
Re: no suitable table factory for file sources
Posted by Jingsong Li <ji...@gmail.com>.
Hi Günter,
Now File system connector only has OldCsv format [1].
But your yaml has new csv properties like "format.allow-comments".
You can check Old csv supported properties.
[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#file-system-connector
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#old-csv-format
Best,
Jingsong Lee
On Mon, Feb 3, 2020 at 1:00 PM Günter Hipler <gu...@bluewin.ch>
wrote:
> Hi,
>
> I can't read from a file source using the sql-client tool.
>
> I just set up a simple test scenario with the configuration file in [1]
>
> I'm getting the error in [2] starting the environment with
> bin/sql-client.sh embedded -d gh/sql-client-conf.yaml -l lib
> in the standard Flink 1.9.1 download environment.
>
> Reading the documentation
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#connectors
> I expected the file system connectors as "built in"
>
> I'm getting a similar issue while following the sql-training tutorial
> from https://github.com/ververica/sql-training/wiki. There I changed the
> used sql-client-conf.yaml file of the docker container for the sql client.
>
> Thanks for any hints
>
> Günter
>
>
>
>
> [1]
>
>
> ################################################################################
> # Define table sources here. See the Table API & SQL documentation for
> details.
>
> tables:
>
> - name: Guenter
> type: source-table
> update-mode: append
> connector:
> type: filesystem
> path: file:///home/swissbib/temp/trash/hello.txt
>
>
> format:
> type: csv
> # required: define the schema either by using type information
> schema: "ROW(test STRING)"
>
> # or use the table's schema
> derive-schema: true
>
> field-delimiter: ";" # optional: field delimiter
> character (',' by default)
> line-delimiter: "\r\n" # optional: line delimiter ("\n" by
> default; otherwise "\r" or "\r\n" are allowed)
> quote-character: "'" # optional: quote character for
> enclosing field values ('"' by default)
> allow-comments: true # optional: ignores comment lines
> that start with "#" (disabled by default)
> # if enabled, make sure to also ignore parse errors to allow
> empty rows
> ignore-parse-errors: true # optional: skip fields and rows
> with parse errors instead of failing;
> # fields are set to null in case of errors
> array-element-delimiter: "|" # optional: the array element
> delimiter string for separating
> # array and row element values (";" by default)
> escape-character: "\\" # optional: escape character for
> escaping values (disabled by default)
> null-literal: "n/a" # optional: null literal string that
> is interpreted as a
> # null value (disabled by default)
>
>
>
>
> #==============================================================================
> # Execution properties
>
> #==============================================================================
>
> # Execution properties allow for changing the behavior of a table program.
>
> execution:
> #planner: blink
> type: streaming # 'batch' or 'streaming' execution
> result-mode: table # 'changelog' or 'table' presentation of
> results
> parallelism: 1 # parallelism of the program
> max-parallelism: 128 # maximum parallelism
> min-idle-state-retention: 0 # minimum idle state retention in ms
> max-idle-state-retention: 0 # maximum idle state retention in ms
>
>
> #==============================================================================
> # Deployment properties
>
> #==============================================================================
>
> # Deployment properties allow for describing the cluster to which table
> # programs are submitted to.
>
> deployment:
> type: standalone # only the 'standalone' deployment is
> supported
> response-timeout: 5000 # general cluster communication timeout
> in ms
> gateway-address: "" # (optional) address from cluster to
> gateway
> gateway-port: 0 # (optional) port from cluster to gateway
>
> [2]
>
> bin/sql-client.sh embedded -d gh/sql-client-conf.yaml -l lib
> Reading default environment from:
> file:/usr/local/swissbib/flink-1.9.1/gh/sql-client-conf.yaml
> No session environment specified.
> Validating current environment...
>
> Exception in thread "main"
> org.apache.flink.table.client.SqlClientException: The configured
> environment is invalid. Please check your environment files again.
> at
>
> org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:147)
> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:99)
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:194)
> Caused by: org.apache.flink.table.client.gateway.SqlExecutionException:
> Could not create execution context.
> at
>
> org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:562)
> at
>
> org.apache.flink.table.client.gateway.local.LocalExecutor.validateSession(LocalExecutor.java:382)
> at
>
> org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:144)
> ... 2 more
> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
> Could not find a suitable table factory for
> 'org.apache.flink.table.factories.TableSourceFactory' in
> the classpath.
>
> Reason: No factory supports all properties.
>
> The following properties are requested:
> connector.path=file:///home/swissbib/temp/trash/hello.txt
> connector.type=filesystem
> format.allow-comments=true
> format.array-element-delimiter=|
> format.derive-schema=true
> format.escape-character=\\
> format.field-delimiter=;
> format.ignore-parse-errors=true
> format.line-delimiter=\r\n
> format.null-literal=n/a
> format.quote-character='
> format.schema=ROW(test STRING)
> format.type=csv
> update-mode=append
>
> The following factories have been considered:
> org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
> org.apache.flink.table.sources.CsvBatchTableSourceFactory
> org.apache.flink.table.sources.CsvAppendTableSourceFactory
> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
> org.apache.flink.table.sinks.CsvAppendTableSinkFactory
> org.apache.flink.table.planner.StreamPlannerFactory
> org.apache.flink.table.executor.StreamExecutorFactory
> org.apache.flink.table.planner.delegation.BlinkPlannerFactory
> org.apache.flink.table.planner.delegation.BlinkExecutorFactory
> at
>
> org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:370)
> at
>
> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:197)
> at
>
> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144)
> at
>
> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:114)
> at
>
> org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:265)
> at
>
> org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$new$1(ExecutionContext.java:144)
> at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
> at
>
> org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:142)
> at
>
> org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:558)
> ... 4 more
>
>
>
>
--
Best, Jingsong Lee