You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "leishuiyu (JIRA)" <ji...@apache.org> on 2019/05/16 09:01:00 UTC

[jira] [Created] (FLINK-12531) flink sql-client throw NoMatchingTableFactoryException

leishuiyu created FLINK-12531:
---------------------------------

             Summary: flink sql-client throw NoMatchingTableFactoryException
                 Key: FLINK-12531
                 URL: https://issues.apache.org/jira/browse/FLINK-12531
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Client
    Affects Versions: 1.7.2
            Reporter: leishuiyu


1.bin/sql-client.sh embedded -e conf/sql-client-cp.yaml

2.sql-client-cp.yaml
{code:java}
//代码占位符
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################


# This file defines the default environment for Flink's SQL Client.
# Defaults might be overwritten by a session specific environment.


# See the Table API & SQL documentation for details about supported properties.


#==============================================================================
# Tables
#==============================================================================

# Define tables here such as sources, sinks, views, or temporal tables.

tables: [] # empty list
# A typical table source definition looks like:
# - name: ...
# type: source-table
# connector: ...
# format: ...
# schema: ...

# A typical view definition looks like:
# - name: ...
# type: view
# query: "SELECT ..."

# A typical temporal table definition looks like:
# - name: ...
# type: temporal-table
# history-table: ...
# time-attribute: ...
# primary-key: ...

#==============================================================================
# User-defined functions
#==============================================================================

tables:
- name: MyUserTable # name the new table
type: source # declare if the table should be "source", "sink", or "both"
update-mode: append # specify the update-mode for streaming tables

# declare the external system to connect to
connector:
type: kafka
version: "0.11"
topic: test-input
startup-mode: earliest-offset
properties:
- key: zookeeper.connect
value: centos-6:2181
- key: bootstrap.servers
value: centos-6:9092

# declare a format for this system
format:
type: avro
avro-schema: >
{
"namespace": "org.myorganization",
"type": "record",
"name": "UserMessage",
"fields": [
{"name": "ts", "type": "string"},
{"name": "user", "type": "long"},
{"name": "message", "type": ["string", "null"]}
]
}

# declare the schema of the table
schema:
- name: rowtime
type: TIMESTAMP
rowtime:
timestamps:
type: from-field
from: ts
watermarks:
type: periodic-bounded
delay: "60000"
- name: user
type: BIGINT
- name: message
type: VARCHAR
# Define scalar, aggregate, or table functions here.

functions: [] # empty list
# A typical function definition looks like:
# - name: ...
# from: class
# class: ...
# constructor: ...

#==============================================================================
# Execution properties
#==============================================================================

# Execution properties allow for changing the behavior of a table program.

execution:
# 'batch' or 'streaming' execution
type: streaming
# allow 'event-time' or only 'processing-time' in sources
time-characteristic: event-time
# interval in ms for emitting periodic watermarks
periodic-watermarks-interval: 200
# 'changelog' or 'table' presentation of results
result-mode: table
# maximum number of maintained rows in 'table' presentation of results
max-table-result-rows: 1000000
# parallelism of the program
parallelism: 1
# maximum parallelism
max-parallelism: 128
# minimum idle state retention in ms
min-idle-state-retention: 0
# maximum idle state retention in ms
max-idle-state-retention: 0
# controls how table programs are restarted in case of a failures
restart-strategy:
# strategy type
# possible values are "fixed-delay", "failure-rate", "none", or "fallback" (default)
type: fallback


#==============================================================================
# Deployment properties
#==============================================================================

# Deployment properties allow for describing the cluster to which table
# programs are submitted to.

deployment:
# general cluster communication timeout in ms
response-timeout: 5000
# (optional) address from cluster to gateway
gateway-address: ""
# (optional) port from cluster to gateway
gateway-port: 0
{code}
 3.log show

 
{code:java}

//代码占位符No default environment specified.
Searching for '/opt/soft/flink-1.7.1/conf/sql-client-defaults.yaml'...found.
Reading default environment from: file:/opt/soft/flink-1.7.1/conf/sql-client-defaults.yaml
Reading session environment from: file:/opt/soft/flink-1.7.1/conf/sql-client-cp.yaml
Validating current environment...

Exception in thread "main" org.apache.flink.table.client.SqlClientException: The configured environment is invalid. Please check your environment files again.
at org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:140)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:99)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:187)
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context.
at org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:488)
at org.apache.flink.table.client.gateway.local.LocalExecutor.validateSession(LocalExecutor.java:316)
at org.apache.flink.table.client.SqlClient.validateEnvironment(SqlClient.java:137)
... 2 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.StreamTableSourceFactory' in
the classpath.

Reason: No context matches.

The following properties are requested:
connector.properties.0.key=zookeeper.connect
connector.properties.0.value=centos-6:2181
connector.properties.1.key=bootstrap.servers
connector.properties.1.value=centos-6:9092
connector.startup-mode=earliest-offset
connector.topic=test-input
connector.type=kafka
connector.version=0.11
format.avro-schema={\n \"namespace\": \"org.myorganization\",\n \"type\": \"record\",\n \"name\": \"UserMessage\",\n \"fields\": [\n {\"name\": \"ts\", \"type\": \"string\"},\n {\"name\": \"user\", \"type\": \"long\"},\n {\"name\": \"message\", \"type\": [\"string\", \"null\"]}\n ]\n}\n
format.type=avro
schema.0.name=rowtime
schema.0.rowtime.timestamps.from=ts
schema.0.rowtime.timestamps.type=from-field
schema.0.rowtime.watermarks.delay=60000
schema.0.rowtime.watermarks.type=periodic-bounded
schema.0.type=TIMESTAMP
schema.1.name=user
schema.1.type=BIGINT
schema.2.name=message
schema.2.type=VARCHAR
update-mode=append

The following factories have been considered:
org.apache.flink.formats.avro.AvroRowFormatFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
org.apache.flink.formats.json.JsonRowFormatFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory

at org.apache.flink.table.factories.TableFactoryService$.filterByContext(TableFactoryService.scala:214)
at org.apache.flink.table.factories.TableFactoryService$.findInternal(TableFactoryService.scala:130)
at org.apache.flink.table.factories.TableFactoryService$.find(TableFactoryService.scala:100)
at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.scala)
at org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:236)
at org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$new$0(ExecutionContext.java:121)
at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684)
at org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:119)
at org.apache.flink.table.client.gateway.local.LocalExecutor.getOrCreateExecutionContext(LocalExecutor.java:484)
... 4 more
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)