You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "mzz (Jira)" <ji...@apache.org> on 2020/06/08 13:06:00 UTC

[jira] [Created] (FLINK-18184) Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory'

mzz created FLINK-18184:
---------------------------

             Summary: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory'
                 Key: FLINK-18184
                 URL: https://issues.apache.org/jira/browse/FLINK-18184
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / API
    Affects Versions: 1.9.1
         Environment: local:macos

flink1.9

 
            Reporter: mzz


val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.enableCheckpointing(5000) // checkpoint every 5000 msecs

    //kafak配置
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "172.16.30.207:9092")
    properties.setProperty("group.id", "km_aggs_group")


    val fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
    val kafkaConsumer = new FlinkKafkaConsumer[String](TOPIC, new SimpleStringSchema(), properties).setStartFromEarliest()
    //    val source = env.addSource(kafkaConsumer)
    val streamTableEnvironment = StreamTableEnvironment.create(env,fsSettings)
    streamTableEnvironment.connect(new Kafka()
      .topic(TOPIC)
      .version(VERSION)
      .startFromEarliest()

      .property("bootstrap.servers", "172.16.30.207:9092")
      .property("zookeeper.connect", "172.16.30.207:2181")
      .property("group.id", "km_aggs_group_table")
      //      .properties(properties)
    )
      .withFormat(
        new Json()
          .failOnMissingField(true)
          .deriveSchema()
      )
      .withSchema(new Schema()
        .field("advs", Types.STRING())
        .field("devs", Types.STRING())
        .field("environment", Types.STRING())
        .field("events", Types.STRING())
        .field("identity", Types.STRING())
        .field("ip", Types.STRING())
        .field("launchs", Types.STRING())
        .field("ts", Types.STRING())
      )
      .inAppendMode()
      .registerTableSource("aggs_test")

    val tableResult = streamTableEnvironment.sqlQuery("select * from aggs_test")
    tableResult.printSchema()
//    streamTableEnvironment.toAppendStream[Row](tableResult).print()
    //启动程序
    env.execute("test_kafka")


--------------------------------------------------------
erroe message:

Exception in thread "main" org.apache.flink.table.api.TableException: findAndCreateTableSource failed.
	at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:67)
	at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:54)
	at org.apache.flink.table.descriptors.ConnectTableDescriptor.registerTableSource(ConnectTableDescriptor.java:69)
	at KM.COM.KafakaHelper.FlinkTableConnKafka$.main(FlinkTableConnKafka.scala:70)
	at KM.COM.KafakaHelper.FlinkTableConnKafka.main(FlinkTableConnKafka.scala)
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: No context matches.

The following properties are requested:
connector.properties.0.key=zookeeper.connect
connector.properties.0.value=172.16.30.207:2181
connector.properties.1.key=group.id
connector.properties.1.value=km_aggs_group_table
connector.properties.2.key=bootstrap.servers
connector.properties.2.value=172.16.30.207:9092
connector.property-version=1
connector.startup-mode=earliest-offset
connector.topic=aggs_topic
connector.type=kafka
connector.version=2.0
format.derive-schema=true
format.fail-on-missing-field=true
format.property-version=1
format.type=json
schema.0.name=advs
schema.0.type=VARCHAR
schema.1.name=devs
schema.1.type=VARCHAR
schema.2.name=environment
schema.2.type=VARCHAR
schema.3.name=events
schema.3.type=VARCHAR
schema.4.name=identity
schema.4.type=VARCHAR
schema.5.name=ip
schema.5.type=VARCHAR
schema.6.name=launchs
schema.6.type=VARCHAR
schema.7.name=ts
schema.7.type=VARCHAR
update-mode=append

The following factories have been considered:
org.apache.flink.formats.json.JsonRowFormatFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
org.apache.flink.table.planner.StreamPlannerFactory
org.apache.flink.table.executor.StreamExecutorFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
	at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:283)
	at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:191)
	at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144)
	at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:97)
	at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:64)
	... 4 more

I've tried these way,Didn't solve my problem。[https://stackoverflow.com/questions/52500048/flink-could-not-find-a-suitable-table-factory-for-org-apache-flink-table-facto|https://stackoverflow.com/questions/52500048/flink-could-not-find-a-suitable-table-factory-for-org-apache-flink-table-facto]
Anyone help me ,THX!





--
This message was sent by Atlassian Jira
(v8.3.4#803005)