You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Günter Hipler <gu...@bluewin.ch> on 2020/02/03 05:00:15 UTC

no suitable table factory for file sources

Hi,

I can't read from a file source using the sql-client tool.

I just set up a simple test scenario with the configuration file in [1]

I'm getting the error in [2] starting the environment with
bin/sql-client.sh embedded -d gh/sql-client-conf.yaml -l lib
in the standard Flink 1.9.1 download environment.

Reading the documentation 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#connectors 
I expected the file system connectors as "built in"

I'm getting a similar issue while following the sql-training tutorial 
from https://github.com/ververica/sql-training/wiki. There I changed the 
used sql-client-conf.yaml file of the docker container for the sql client.

Thanks for any hints

Günter




[1]

################################################################################
# Define table sources here. See the Table API & SQL documentation for 
details.

tables:

   - name: Guenter
     type: source-table
     update-mode: append
     connector:
       type: filesystem
       path: file:///home/swissbib/temp/trash/hello.txt


     format:
       type: csv
       # required: define the schema either by using type information
       schema: "ROW(test STRING)"

       # or use the table's schema
       derive-schema: true

       field-delimiter: ";"         # optional: field delimiter 
character (',' by default)
       line-delimiter: "\r\n"       # optional: line delimiter ("\n" by 
default; otherwise "\r" or "\r\n" are allowed)
       quote-character: "'"         # optional: quote character for 
enclosing field values ('"' by default)
       allow-comments: true         # optional: ignores comment lines 
that start with "#" (disabled by default)
       #   if enabled, make sure to also ignore parse errors to allow 
empty rows
       ignore-parse-errors: true    # optional: skip fields and rows 
with parse errors instead of failing;
       #   fields are set to null in case of errors
       array-element-delimiter: "|" # optional: the array element 
delimiter string for separating
       #   array and row element values (";" by default)
       escape-character: "\\"       # optional: escape character for 
escaping values (disabled by default)
       null-literal: "n/a"          # optional: null literal string that 
is interpreted as a
       #   null value (disabled by default)



#==============================================================================
# Execution properties
#==============================================================================

# Execution properties allow for changing the behavior of a table program.

execution:
   #planner: blink
   type: streaming              # 'batch' or 'streaming' execution
   result-mode: table           # 'changelog' or 'table' presentation of 
results
   parallelism: 1               # parallelism of the program
   max-parallelism: 128         # maximum parallelism
   min-idle-state-retention: 0  # minimum idle state retention in ms
   max-idle-state-retention: 0  # maximum idle state retention in ms

#==============================================================================
# Deployment properties
#==============================================================================

# Deployment properties allow for describing the cluster to which table
# programs are submitted to.

deployment:
   type: standalone             # only the 'standalone' deployment is 
supported
   response-timeout: 5000       # general cluster communication timeout 
in ms
   gateway-address: ""          # (optional) address from cluster to gateway
   gateway-port: 0              # (optional) port from cluster to gateway

[2]

bin/sql-client.sh embedded -d gh/sql-client-conf.yaml -l lib
Reading default environment from: 
file:/usr/local/swissbib/flink-1.9.1/gh/sql-client-conf.yaml
No session environment specified.
Validating current environment...

Exception in thread "main" 
org.apache.flink.table.client.SqlClientException: The configured 
environment is invalid. Please check your environment files again.
     at 
org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:147)
     at org.apache.flink.table.client.SqlClient.start(SqlClient.java:99)
     at org.apache.flink.table.client.SqlClient.main(SqlClient.java:194)
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: 
Could not create execution context.
     at 
org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:562)
     at 
org.apache.flink.table.client.gateway.local.LocalExecutor.validateSession(LocalExecutor.java:382)
     at 
org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:144)
     ... 2 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: 
Could not find a suitable table factory for 
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: No factory supports all properties.

The following properties are requested:
connector.path=file:///home/swissbib/temp/trash/hello.txt
connector.type=filesystem
format.allow-comments=true
format.array-element-delimiter=|
format.derive-schema=true
format.escape-character=\\
format.field-delimiter=;
format.ignore-parse-errors=true
format.line-delimiter=\r\n
format.null-literal=n/a
format.quote-character='
format.schema=ROW(test STRING)
format.type=csv
update-mode=append

The following factories have been considered:
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.table.planner.StreamPlannerFactory
org.apache.flink.table.executor.StreamExecutorFactory
org.apache.flink.table.planner.delegation.BlinkPlannerFactory
org.apache.flink.table.planner.delegation.BlinkExecutorFactory
     at 
org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:370)
     at 
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:197)
     at 
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144)
     at 
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:114)
     at 
org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:265)
     at 
org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$new$1(ExecutionContext.java:144)
     at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
     at 
org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:142)
     at 
org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:558)
     ... 4 more




Re: no suitable table factory for file sources

Posted by Jingsong Li <ji...@gmail.com>.
Hi Günter,

Now File system connector only has OldCsv format [1].
 But your yaml has new csv properties like "format.allow-comments".

You can check Old csv supported properties.

[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#file-system-connector
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#old-csv-format

Best,
Jingsong Lee

On Mon, Feb 3, 2020 at 1:00 PM Günter Hipler <gu...@bluewin.ch>
wrote:

> Hi,
>
> I can't read from a file source using the sql-client tool.
>
> I just set up a simple test scenario with the configuration file in [1]
>
> I'm getting the error in [2] starting the environment with
> bin/sql-client.sh embedded -d gh/sql-client-conf.yaml -l lib
> in the standard Flink 1.9.1 download environment.
>
> Reading the documentation
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#connectors
> I expected the file system connectors as "built in"
>
> I'm getting a similar issue while following the sql-training tutorial
> from https://github.com/ververica/sql-training/wiki. There I changed the
> used sql-client-conf.yaml file of the docker container for the sql client.
>
> Thanks for any hints
>
> Günter
>
>
>
>
> [1]
>
>
> ################################################################################
> # Define table sources here. See the Table API & SQL documentation for
> details.
>
> tables:
>
>    - name: Guenter
>      type: source-table
>      update-mode: append
>      connector:
>        type: filesystem
>        path: file:///home/swissbib/temp/trash/hello.txt
>
>
>      format:
>        type: csv
>        # required: define the schema either by using type information
>        schema: "ROW(test STRING)"
>
>        # or use the table's schema
>        derive-schema: true
>
>        field-delimiter: ";"         # optional: field delimiter
> character (',' by default)
>        line-delimiter: "\r\n"       # optional: line delimiter ("\n" by
> default; otherwise "\r" or "\r\n" are allowed)
>        quote-character: "'"         # optional: quote character for
> enclosing field values ('"' by default)
>        allow-comments: true         # optional: ignores comment lines
> that start with "#" (disabled by default)
>        #   if enabled, make sure to also ignore parse errors to allow
> empty rows
>        ignore-parse-errors: true    # optional: skip fields and rows
> with parse errors instead of failing;
>        #   fields are set to null in case of errors
>        array-element-delimiter: "|" # optional: the array element
> delimiter string for separating
>        #   array and row element values (";" by default)
>        escape-character: "\\"       # optional: escape character for
> escaping values (disabled by default)
>        null-literal: "n/a"          # optional: null literal string that
> is interpreted as a
>        #   null value (disabled by default)
>
>
>
>
> #==============================================================================
> # Execution properties
>
> #==============================================================================
>
> # Execution properties allow for changing the behavior of a table program.
>
> execution:
>    #planner: blink
>    type: streaming              # 'batch' or 'streaming' execution
>    result-mode: table           # 'changelog' or 'table' presentation of
> results
>    parallelism: 1               # parallelism of the program
>    max-parallelism: 128         # maximum parallelism
>    min-idle-state-retention: 0  # minimum idle state retention in ms
>    max-idle-state-retention: 0  # maximum idle state retention in ms
>
>
> #==============================================================================
> # Deployment properties
>
> #==============================================================================
>
> # Deployment properties allow for describing the cluster to which table
> # programs are submitted to.
>
> deployment:
>    type: standalone             # only the 'standalone' deployment is
> supported
>    response-timeout: 5000       # general cluster communication timeout
> in ms
>    gateway-address: ""          # (optional) address from cluster to
> gateway
>    gateway-port: 0              # (optional) port from cluster to gateway
>
> [2]
>
> bin/sql-client.sh embedded -d gh/sql-client-conf.yaml -l lib
> Reading default environment from:
> file:/usr/local/swissbib/flink-1.9.1/gh/sql-client-conf.yaml
> No session environment specified.
> Validating current environment...
>
> Exception in thread "main"
> org.apache.flink.table.client.SqlClientException: The configured
> environment is invalid. Please check your environment files again.
>      at
>
> org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:147)
>      at org.apache.flink.table.client.SqlClient.start(SqlClient.java:99)
>      at org.apache.flink.table.client.SqlClient.main(SqlClient.java:194)
> Caused by: org.apache.flink.table.client.gateway.SqlExecutionException:
> Could not create execution context.
>      at
>
> org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:562)
>      at
>
> org.apache.flink.table.client.gateway.local.LocalExecutor.validateSession(LocalExecutor.java:382)
>      at
>
> org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:144)
>      ... 2 more
> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
> Could not find a suitable table factory for
> 'org.apache.flink.table.factories.TableSourceFactory' in
> the classpath.
>
> Reason: No factory supports all properties.
>
> The following properties are requested:
> connector.path=file:///home/swissbib/temp/trash/hello.txt
> connector.type=filesystem
> format.allow-comments=true
> format.array-element-delimiter=|
> format.derive-schema=true
> format.escape-character=\\
> format.field-delimiter=;
> format.ignore-parse-errors=true
> format.line-delimiter=\r\n
> format.null-literal=n/a
> format.quote-character='
> format.schema=ROW(test STRING)
> format.type=csv
> update-mode=append
>
> The following factories have been considered:
> org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
> org.apache.flink.table.sources.CsvBatchTableSourceFactory
> org.apache.flink.table.sources.CsvAppendTableSourceFactory
> org.apache.flink.table.sinks.CsvBatchTableSinkFactory
> org.apache.flink.table.sinks.CsvAppendTableSinkFactory
> org.apache.flink.table.planner.StreamPlannerFactory
> org.apache.flink.table.executor.StreamExecutorFactory
> org.apache.flink.table.planner.delegation.BlinkPlannerFactory
> org.apache.flink.table.planner.delegation.BlinkExecutorFactory
>      at
>
> org.apache.flink.table.factories.TableFactoryService.filterBySupportedProperties(TableFactoryService.java:370)
>      at
>
> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:197)
>      at
>
> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144)
>      at
>
> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:114)
>      at
>
> org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:265)
>      at
>
> org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$new$1(ExecutionContext.java:144)
>      at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
>      at
>
> org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:142)
>      at
>
> org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:558)
>      ... 4 more
>
>
>
>

-- 
Best, Jingsong Lee