You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Ravi Sankar Reddy Sangana <Ra...@radware.com> on 2020/10/03 10:58:19 UTC
Flink table error in 1.11 "Could not find a suitable table factory "
Hi Team,
I am trying Flink 1.11 by using the official doc and examples. Here is my code.
While executing I am getting this error.
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.
Reason: Required context properties mismatch.
The matching candidates:
org.apache.flink.table.sources.CsvAppendTableSourceFactory
Mismatched properties:
'connector.type' expects 'filesystem', but is 'kafka'
'format.type' expects 'csv', but is 'json'
The following properties are requested:
connector.properties.auto.offset.reset=latest
connector.properties.bootstrap.servers=localhost:9092
connector.properties.group.id=testflinktopurl
connector.property-version=1
connector.startup-mode=latest-offset
connector.topic=badbotstream
connector.type=kafka
connector.version=0.11
format.fail-on-missing-field=false
format.property-version=1
format.type=json
schema.0.data-type=VARCHAR(2147483647)
schema.0.name=sid
schema.1.data-type=VARCHAR(2147483647)
schema.1.name=_zpsbd2
schema.10.data-type=VARCHAR(2147483647)
schema.10.name=noqryurl
schema.11.data-type=VARCHAR(2147483647)
schema.11.name=_zpsbd8
schema.2.data-type=VARCHAR(2147483647)
schema.2.name=_zpsbd3
schema.3.data-type=VARCHAR(2147483647)
schema.3.name=_zpsbd6
schema.4.data-type=VARCHAR(2147483647)
schema.4.name=_zpsbd7
schema.5.data-type=VARCHAR(2147483647)
schema.5.name=sign
schema.6.data-type=VARCHAR(2147483647)
schema.6.name=rule
schema.7.data-type=VARCHAR(2147483647)
schema.7.name=_blockdigest
schema.8.data-type=VARCHAR(2147483647)
schema.8.name=recvdTime
schema.9.data-type=VARCHAR(2147483647)
schema.9.name=botcode
update-mode=append
The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
org.apache.flink.table.filesystem.FileSystemTableFactory
at org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322) ~[flink-table_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190) ~[flink-table_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143) ~[flink-table_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96) ~[flink-table_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:46) ~[flink-table_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.table.catalog.DatabaseCalciteSchema.convertCatalogTable(DatabaseCalciteSchema.java:162) ~[flink-table_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.table.catalog.DatabaseCalciteSchema.convertTable(DatabaseCalciteSchema.java:107) ~[flink-table_2.11-1.11.2.jar:1.11.2]
at org.apache.flink.table.catalog.DatabaseCalciteSchema.lambda$getTable$0(DatabaseCalciteSchema.java:91) ~[flink-table_2.11-1.11.2.jar:1.11.2]
at java.util.Optional.map(Optional.java:215) ~[?:1.8.0_265]
at org.apache.flink.table.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:82) ~[flink-table_2.11-1.11.2.jar:1.11.2]
at org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83) ~[flink-table_2.11-1.11.2.jar:1.11.2]
at org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:289) ~[flink-table_2.11-1.11.2.jar:1.11.2]
StreamingJob.java (main class)
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
// TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings); //Model 1
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
fsTableEnv.connect(new Kafka()
.version("0.11")
.topic("badbotstream")
.property("auto.offset.reset", "latest")
.property("bootstrap.servers", "localhost:9092")
.property("group.id", "testflinktopurl")
.startFromLatest())
.withFormat(new Json().failOnMissingField(false))
.withSchema(new Schema()
.field("sid",DataTypes.STRING())
.field("_zpsbd2", DataTypes.STRING())
.field("_zpsbd3", DataTypes.STRING())
.field("_zpsbd6", DataTypes.STRING())
.field("_zpsbd7", DataTypes.STRING())
.field("sign", DataTypes.STRING())
.field("rule", DataTypes.STRING())
.field("_blockdigest", DataTypes.STRING())
.field("recvdTime", DataTypes.STRING())
.field("botcode", DataTypes.STRING())
.field("noqryurl", DataTypes.STRING())
.field("_zpsbd8", DataTypes.STRING()))
.inAppendMode().createTemporaryTable("badbot");
// TableResult dsRow = fsTableEnv.executeSql("select * from badbot where sid = '4859'"); //Model 1
Table badBot = fsTableEnv.sqlQuery("select * from badbot where sid = '4859'");
DataStream<Row> dsRow = fsTableEnv.toAppendStream(badBot, Row.class)
dsRow.print();
fsEnv.execute("pacifier");
Pom.xml
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
</dependencies>