You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by xi sizhe <he...@gmail.com> on 2020/10/14 01:13:07 UTC
Required context properties mismatch in connecting the flink with
mysql database
I am using flink latest (1.11.2) to work with a sample mysql database,
which the database is working fine.
Additionally, i have added the flink-connector-jdbc_2.11-1.11.2,
mysql-connector-java-8.0.21.jar, postgresql-42.2.17.jar to the {FLINK}/lib
Here is my code
T_CONFIG = TableConfig()
B_EXEC_ENV = ExecutionEnvironment.get_execution_environment()
B_EXEC_ENV.set_parallelism(1)
BT_ENV = BatchTableEnvironment.create(B_EXEC_ENV, T_CONFIG)
ddl = """
CREATE TABLE nba_player4 (
first_name STRING ,
last_name STRING,
email STRING,
id INT
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://localhost:3306/inventory',
'username' = 'root',
'password' = 'debezium',
'table-name' = 'customers'
)
""";
BT_ENV.sql_update(ddl)
sinkddl = """
CREATE TABLE print_table (
f0 INT,
f1 INT,
f2 STRING,
f3 DOUBLE
) WITH (
'connector' = 'print'
)
""";
BT_ENV.sql_update(sinkddl)
sqlquery("SELECT first_name, last_name FROM nba_player4 ");
BT_ENV.execute("table_job")
However when running the code, it come up with error saying
py4j.protocol.Py4JJavaError: An error occurred while calling o23.sqlQuery.
: org.apache.flink.table.api.ValidationException: SQL validation
failed. findAndCreateTableSource failed.
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
Could not find a suitable table factory for
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.
Reason: Required context properties mismatch.
The following properties are requested:
connector=jdbc
password=debezium
schema.0.data-type=VARCHAR(2147483647)schema.0.name=first_name
schema.1.data-type=VARCHAR(2147483647)schema.1.name=last_name
schema.2.data-type=VARCHAR(2147483647)schema.2.name=email
schema.3.data-type=INTschema.3.name=id
table-name=customers
url=jdbc:mysql://localhost:3306/inventory
username=root
The following factories have been considered:
org.apache.flink.connector.jdbc.table.JdbcTableSourceSinkFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
org.apache.flink.table.filesystem.FileSystemTableFactory
latest:
this is my docker yml file.
version: '2.1'
services:
jobmanager:
build: .
image: flink:latest
hostname: "jobmanager"
expose:
- "6123"
ports:
- "8081:8081"
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
taskmanager:
image: flink:latest
expose:
- "6121"
- "6122"
depends_on:
- jobmanager
command: taskmanager
links:
- jobmanager:jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
mysql:
image: debezium/example-mysql
ports:
- "3306:3306"
environment:
- MYSQL_ROOT_PASSWORD=debezium
- MYSQL_USER=mysqluser
- MYSQL_PASSWORD=mysqlpw
docker ps commands show out
CONTAINER ID IMAGE COMMAND
CREATED STATUS PORTS
NAMES
cf84c84f7821 flink "/docker-entrypoint.…" 2 minutes ago
Up 2 minutes 6121-6123/tcp, 8081/tcp
_taskmanager_1
09b19142d70a flink "/docker-entrypoint.…" 9 minutes ago
Up 9 minutes 6123/tcp, 0.0.0.0:8081->8081/tcp
_jobmanager_1
4ac01eb11bf7 debezium/example-mysql "docker-entrypoint.s…"
3 days ago Up 9 minutes 0.0.0.0:3306->3306/tcp,
33060/tcp keras-flask-dep
more info:
*my current flink environment* in docker is flink:scala_2.12-java8
docker pull flink:scala_2.12-java8
*pyflink jdbc connector* is flink-connector-jdbc_2.11-1.11.2.jar from flink
1.11 version.
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html
in order to use the jdbc library, I tried two ways
1.
save the flink-connector-jdbc_2.11-1.11.2.jar into
/usr/local/lib/python3.7/site-packages/pyflink/lib
2.
configure the classpath in the python app
base_dir = "/Users/huhu/Documents/projects/webapp/libs/"
flink_jdbc_jar = f"file://{base_dir}flink-connector-jdbc_2.11-1.11.2.jar"
BT_ENV.get_config().get_configuration().set_string("pipeline.jars",jars)
but still getting the same error
Re: Required context properties mismatch in connecting the flink with
mysql database
Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi,
I think the problem is that you are using BatchTableEnvironment which is
deprecated and does not support newer features such as e.g. FLIP-95
sources/sinks. I am sorry it is not more prominent in the documentation.
I am not too familiar with the python API, and I am not sure if a
unified TableEnvironment is available there. In Java/Scala I'd recommend
using the unified TableEnvironment. If it is not available in python
API, you can use the StreamTableEnvironment, which actually extends the
unified one.
Moreover, please, please make sure you are using the same component
versions or otherwise you might face hard to track problems. You are
mixing components for different scala versions. (Your cluster uses scala
2.12, but you are adding scala 2.11 additional dependencies).
Best,
Dawid
On 14/10/2020 03:13, xi sizhe wrote:
>
> I am using flink latest (1.11.2) to work with a sample mysql database,
> which the database is working fine.
>
> Additionally, i have added the flink-connector-jdbc_2.11-1.11.2,
> mysql-connector-java-8.0.21.jar, postgresql-42.2.17.jar to the {FLINK}/lib
>
> Here is my code
>
> |T_CONFIG = TableConfig() B_EXEC_ENV =
> ExecutionEnvironment.get_execution_environment()
> B_EXEC_ENV.set_parallelism(1) BT_ENV =
> BatchTableEnvironment.create(B_EXEC_ENV, T_CONFIG) ddl = """ CREATE
> TABLE nba_player4 ( first_name STRING , last_name STRING, email
> STRING, id INT ) WITH ( 'connector' = 'jdbc', 'url' =
> 'jdbc:mysql://localhost:3306/inventory', 'username' = 'root',
> 'password' = 'debezium', 'table-name' = 'customers' ) """;
> BT_ENV.sql_update(ddl) sinkddl = """ CREATE TABLE print_table ( f0
> INT, f1 INT, f2 STRING, f3 DOUBLE ) WITH ( 'connector' = 'print' )
> """; BT_ENV.sql_update(sinkddl) sqlquery("SELECT first_name, last_name
> FROM nba_player4 "); BT_ENV.execute("table_job") |
>
> However when running the code, it come up with error saying
>
> |py4j.protocol.Py4JJavaError: An error occurred while calling
> o23.sqlQuery. : org.apache.flink.table.api.ValidationException: SQL
> validation failed. findAndCreateTableSource failed. Caused by:
> org.apache.flink.table.api.NoMatchingTableFactoryException: Could not
> find a suitable table factory for
> 'org.apache.flink.table.factories.TableSourceFactory' in the
> classpath. Reason: Required context properties mismatch. The following
> properties are requested: connector=jdbc password=debezium
> schema.0.data-type=VARCHAR(2147483647) schema.0.name
> <http://schema.0.name>=first_name
> schema.1.data-type=VARCHAR(2147483647) schema.1.name
> <http://schema.1.name>=last_name
> schema.2.data-type=VARCHAR(2147483647) schema.2.name
> <http://schema.2.name>=email schema.3.data-type=INT schema.3.name
> <http://schema.3.name>=id table-name=customers
> url=jdbc:mysql://localhost:3306/inventory username=root The following
> factories have been considered:
> org.apache.flink.connector.jdbc.table.JdbcTableSourceSinkFactory
> org.apache.flink.table.sources.CsvBatchTableSourceFactory
> org.apache.flink.table.sources.CsvAppendTableSourceFactory
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
> org.apache.flink.table.filesystem.FileSystemTableFactory |
>
> latest:
>
> this is my docker yml file.
>
> |version: '2.1' services: jobmanager: build: . image: flink:latest
> hostname: "jobmanager" expose: - "6123" ports: - "8081:8081" command:
> jobmanager environment: - JOB_MANAGER_RPC_ADDRESS=jobmanager
> taskmanager: image: flink:latest expose: - "6121" - "6122" depends_on:
> - jobmanager command: taskmanager links: - jobmanager:jobmanager
> environment: - JOB_MANAGER_RPC_ADDRESS=jobmanager mysql: image:
> debezium/example-mysql ports: - "3306:3306" environment: -
> MYSQL_ROOT_PASSWORD=debezium - MYSQL_USER=mysqluser -
> MYSQL_PASSWORD=mysqlpw |
>
> docker ps commands show out
>
> |CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES cf84c84f7821
> flink "/docker-entrypoint.…" 2 minutes ago Up 2 minutes 6121-6123/tcp,
> 8081/tcp _taskmanager_1 09b19142d70a flink "/docker-entrypoint.…" 9
> minutes ago Up 9 minutes 6123/tcp, 0.0.0.0:8081->8081/tcp
> _jobmanager_1 4ac01eb11bf7 debezium/example-mysql
> "docker-entrypoint.s…" 3 days ago Up 9 minutes 0.0.0.0:3306->3306/tcp,
> 33060/tcp keras-flask-dep |
>
> more info:
>
> *my current flink environment* in docker is flink:scala_2.12-java8
>
> |docker pull flink:scala_2.12-java8 |
>
> *pyflink jdbc connector* is flink-connector-jdbc_2.11-1.11.2.jar from
> flink 1.11 version.
>
> |https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html
> |
>
> in order to use the jdbc library, I tried two ways
>
> 1.
>
> save the flink-connector-jdbc_2.11-1.11.2.jar into
> /usr/local/lib/python3.7/site-packages/pyflink/lib
>
> 2.
>
> configure the classpath in the python app
>
> |base_dir = "/Users/huhu/Documents/projects/webapp/libs/"
> flink_jdbc_jar =
> f"file://{base_dir}flink-connector-jdbc_2.11-1.11.2.jar"
> BT_ENV.get_config().get_configuration().set_string("pipeline.jars",jars)
> |
>
> but still getting the same error
>