You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "mzz (Jira)" <ji...@apache.org> on 2020/06/08 13:06:00 UTC
[jira] [Created] (FLINK-18184) Could not find a suitable table
factory for 'org.apache.flink.table.factories.TableSourceFactory'
mzz created FLINK-18184:
---------------------------
Summary: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory'
Key: FLINK-18184
URL: https://issues.apache.org/jira/browse/FLINK-18184
Project: Flink
Issue Type: Bug
Components: Table SQL / API
Affects Versions: 1.9.1
Environment: local:macos
flink1.9
Reporter: mzz
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(5000) // checkpoint every 5000 msecs
//kafak配置
val properties = new Properties()
properties.setProperty("bootstrap.servers", "172.16.30.207:9092")
properties.setProperty("group.id", "km_aggs_group")
val fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
val kafkaConsumer = new FlinkKafkaConsumer[String](TOPIC, new SimpleStringSchema(), properties).setStartFromEarliest()
// val source = env.addSource(kafkaConsumer)
val streamTableEnvironment = StreamTableEnvironment.create(env,fsSettings)
streamTableEnvironment.connect(new Kafka()
.topic(TOPIC)
.version(VERSION)
.startFromEarliest()
.property("bootstrap.servers", "172.16.30.207:9092")
.property("zookeeper.connect", "172.16.30.207:2181")
.property("group.id", "km_aggs_group_table")
// .properties(properties)
)
.withFormat(
new Json()
.failOnMissingField(true)
.deriveSchema()
)
.withSchema(new Schema()
.field("advs", Types.STRING())
.field("devs", Types.STRING())
.field("environment", Types.STRING())
.field("events", Types.STRING())
.field("identity", Types.STRING())
.field("ip", Types.STRING())
.field("launchs", Types.STRING())
.field("ts", Types.STRING())
)
.inAppendMode()
.registerTableSource("aggs_test")
val tableResult = streamTableEnvironment.sqlQuery("select * from aggs_test")
tableResult.printSchema()
// streamTableEnvironment.toAppendStream[Row](tableResult).print()
//启动程序
env.execute("test_kafka")
--------------------------------------------------------
erroe message:
Exception in thread "main" org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67)
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54)
at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69)
at KM.COM.KafakaHelper.FlinkTableConnKafka$.main(FlinkTableConnKafka.scala:70)
at KM.COM.KafakaHelper.FlinkTableConnKafka.main(FlinkTableConnKafka.scala)
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.
Reason: No context matches.
The following properties are requested:
connector.properties.0.key=zookeeper.connect
connector.properties.0.value=172.16.30.207:2181
connector.properties.1.key=group.id
connector.properties.1.value=km_aggs_group_table
connector.properties.2.key=bootstrap.servers
connector.properties.2.value=172.16.30.207:9092
connector.property-version=1
connector.startup-mode=earliest-offset
connector.topic=aggs_topic
connector.type=kafka
connector.version=2.0
format.derive-schema=true
format.fail-on-missing-field=true
format.property-version=1
format.type=json
schema.0.name=advs
schema.0.type=VARCHAR
schema.1.name=devs
schema.1.type=VARCHAR
schema.2.name=environment
schema.2.type=VARCHAR
schema.3.name=events
schema.3.type=VARCHAR
schema.4.name=identity
schema.4.type=VARCHAR
schema.5.name=ip
schema.5.type=VARCHAR
schema.6.name=launchs
schema.6.type=VARCHAR
schema.7.name=ts
schema.7.type=VARCHAR
update-mode=append
The following factories have been considered:
org.apache.flink.formats.json.JsonRowFormatFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
org.apache.flink.table.planner.StreamPlannerFactory
org.apache.flink.table.executor.StreamExecutorFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:283)
at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:191)
at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144)
at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:97)
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:64)
... 4 more
I've tried these way,Didn't solve my problem。[https://stackoverflow.com/questions/52500048/flink-could-not-find-a-suitable-table-factory-for-org-apache-flink-table-facto|https://stackoverflow.com/questions/52500048/flink-could-not-find-a-suitable-table-factory-for-org-apache-flink-table-facto]
Anyone help me ,THX!
--
This message was sent by Atlassian Jira
(v8.3.4#803005)