You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by xi sizhe <he...@gmail.com> on 2020/10/14 01:13:07 UTC

Required context properties mismatch in connecting the flink with mysql database

I am using flink latest (1.11.2) to work with a sample mysql database,
which the database is working fine.

Additionally, i have added the flink-connector-jdbc_2.11-1.11.2,
mysql-connector-java-8.0.21.jar, postgresql-42.2.17.jar to the {FLINK}/lib

Here is my code

T_CONFIG = TableConfig()
B_EXEC_ENV = ExecutionEnvironment.get_execution_environment()
B_EXEC_ENV.set_parallelism(1)
BT_ENV = BatchTableEnvironment.create(B_EXEC_ENV, T_CONFIG)

ddl = """
            CREATE TABLE nba_player4 (
                 first_name STRING ,
                 last_name STRING,
                 email STRING,
                 id INT
            ) WITH (
                'connector' = 'jdbc',
                'url' = 'jdbc:mysql://localhost:3306/inventory',
                'username' = 'root',
                'password' = 'debezium',
                'table-name' = 'customers'
            )
      """;
BT_ENV.sql_update(ddl)

sinkddl = """
        CREATE TABLE print_table (
         f0 INT,
         f1 INT,
         f2 STRING,
         f3 DOUBLE
        ) WITH (
         'connector' = 'print'
        )
      """;
BT_ENV.sql_update(sinkddl)


sqlquery("SELECT first_name, last_name  FROM nba_player4 ");
BT_ENV.execute("table_job")

However when running the code, it come up with error saying

py4j.protocol.Py4JJavaError: An error occurred while calling o23.sqlQuery.
: org.apache.flink.table.api.ValidationException: SQL validation
failed. findAndCreateTableSource failed.

Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
Could not find a suitable table factory for
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.

Reason: Required context properties mismatch.

The following properties are requested:
connector=jdbc
password=debezium
schema.0.data-type=VARCHAR(2147483647)schema.0.name=first_name
schema.1.data-type=VARCHAR(2147483647)schema.1.name=last_name
schema.2.data-type=VARCHAR(2147483647)schema.2.name=email
schema.3.data-type=INTschema.3.name=id
table-name=customers
url=jdbc:mysql://localhost:3306/inventory
username=root

The following factories have been considered:
org.apache.flink.connector.jdbc.table.JdbcTableSourceSinkFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
org.apache.flink.table.filesystem.FileSystemTableFactory

latest:

this is my docker yml file.

version: '2.1'
services:
  jobmanager:
    build: .
    image: flink:latest
    hostname: "jobmanager"
    expose:
      - "6123"
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
  taskmanager:
    image: flink:latest
    expose:
      - "6121"
      - "6122"
    depends_on:
      - jobmanager
    command: taskmanager
    links:
      - jobmanager:jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
  mysql:
    image: debezium/example-mysql
    ports:
     - "3306:3306"
    environment:
     - MYSQL_ROOT_PASSWORD=debezium
     - MYSQL_USER=mysqluser
     - MYSQL_PASSWORD=mysqlpw

docker ps commands show out

CONTAINER ID        IMAGE                       COMMAND
  CREATED             STATUS              PORTS
                                    NAMES
cf84c84f7821        flink      "/docker-entrypoint.…"   2 minutes ago
     Up 2 minutes        6121-6123/tcp, 8081/tcp
                   _taskmanager_1
09b19142d70a        flink      "/docker-entrypoint.…"   9 minutes ago
     Up 9 minutes        6123/tcp, 0.0.0.0:8081->8081/tcp
                   _jobmanager_1
4ac01eb11bf7        debezium/example-mysql      "docker-entrypoint.s…"
  3 days ago          Up 9 minutes        0.0.0.0:3306->3306/tcp,
33060/tcp                                keras-flask-dep

more info:

*my current flink environment* in docker is flink:scala_2.12-java8

docker pull flink:scala_2.12-java8

*pyflink jdbc connector* is flink-connector-jdbc_2.11-1.11.2.jar from flink
1.11 version.

https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html

in order to use the jdbc library, I tried two ways

   1.

   save the flink-connector-jdbc_2.11-1.11.2.jar into
   /usr/local/lib/python3.7/site-packages/pyflink/lib
   2.

   configure the classpath in the python app

    base_dir = "/Users/huhu/Documents/projects/webapp/libs/"
    flink_jdbc_jar = f"file://{base_dir}flink-connector-jdbc_2.11-1.11.2.jar"

   BT_ENV.get_config().get_configuration().set_string("pipeline.jars",jars)


but still getting the same error

Re: Required context properties mismatch in connecting the flink with mysql database

Posted by Dawid Wysakowicz <dw...@apache.org>.
Hi,

I think the problem is that you are using BatchTableEnvironment which is
deprecated and does not support newer features such as e.g. FLIP-95
sources/sinks. I am sorry it is not more prominent in the documentation.

I am not too familiar with the python API, and I am not sure if a
unified TableEnvironment is available there. In Java/Scala I'd recommend
using the unified TableEnvironment. If it is not available in python
API, you can use the StreamTableEnvironment, which actually extends the
unified one.

Moreover, please, please make sure you are using the same component
versions or otherwise you might face hard to track problems. You are
mixing components for different scala versions. (Your cluster uses scala
2.12, but you are adding scala 2.11 additional dependencies).

Best,

Dawid

On 14/10/2020 03:13, xi sizhe wrote:
>
> I am using flink latest (1.11.2) to work with a sample mysql database,
> which the database is working fine.
>
> Additionally, i have added the flink-connector-jdbc_2.11-1.11.2,
> mysql-connector-java-8.0.21.jar, postgresql-42.2.17.jar to the {FLINK}/lib
>
> Here is my code
>
> |T_CONFIG = TableConfig() B_EXEC_ENV =
> ExecutionEnvironment.get_execution_environment()
> B_EXEC_ENV.set_parallelism(1) BT_ENV =
> BatchTableEnvironment.create(B_EXEC_ENV, T_CONFIG) ddl = """ CREATE
> TABLE nba_player4 ( first_name STRING , last_name STRING, email
> STRING, id INT ) WITH ( 'connector' = 'jdbc', 'url' =
> 'jdbc:mysql://localhost:3306/inventory', 'username' = 'root',
> 'password' = 'debezium', 'table-name' = 'customers' ) """;
> BT_ENV.sql_update(ddl) sinkddl = """ CREATE TABLE print_table ( f0
> INT, f1 INT, f2 STRING, f3 DOUBLE ) WITH ( 'connector' = 'print' )
> """; BT_ENV.sql_update(sinkddl) sqlquery("SELECT first_name, last_name
> FROM nba_player4 "); BT_ENV.execute("table_job") |
>
> However when running the code, it come up with error saying
>
> |py4j.protocol.Py4JJavaError: An error occurred while calling
> o23.sqlQuery. : org.apache.flink.table.api.ValidationException: SQL
> validation failed. findAndCreateTableSource failed. Caused by:
> org.apache.flink.table.api.NoMatchingTableFactoryException: Could not
> find a suitable table factory for
> 'org.apache.flink.table.factories.TableSourceFactory' in the
> classpath. Reason: Required context properties mismatch. The following
> properties are requested: connector=jdbc password=debezium
> schema.0.data-type=VARCHAR(2147483647) schema.0.name
> <http://schema.0.name>=first_name
> schema.1.data-type=VARCHAR(2147483647) schema.1.name
> <http://schema.1.name>=last_name
> schema.2.data-type=VARCHAR(2147483647) schema.2.name
> <http://schema.2.name>=email schema.3.data-type=INT schema.3.name
> <http://schema.3.name>=id table-name=customers
> url=jdbc:mysql://localhost:3306/inventory username=root The following
> factories have been considered:
> org.apache.flink.connector.jdbc.table.JdbcTableSourceSinkFactory
> org.apache.flink.table.sources.CsvBatchTableSourceFactory
> org.apache.flink.table.sources.CsvAppendTableSourceFactory
> org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
> org.apache.flink.table.filesystem.FileSystemTableFactory |
>
> latest:
>
> this is my docker yml file.
>
> |version: '2.1' services: jobmanager: build: . image: flink:latest
> hostname: "jobmanager" expose: - "6123" ports: - "8081:8081" command:
> jobmanager environment: - JOB_MANAGER_RPC_ADDRESS=jobmanager
> taskmanager: image: flink:latest expose: - "6121" - "6122" depends_on:
> - jobmanager command: taskmanager links: - jobmanager:jobmanager
> environment: - JOB_MANAGER_RPC_ADDRESS=jobmanager mysql: image:
> debezium/example-mysql ports: - "3306:3306" environment: -
> MYSQL_ROOT_PASSWORD=debezium - MYSQL_USER=mysqluser -
> MYSQL_PASSWORD=mysqlpw |
>
> docker ps commands show out
>
> |CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES cf84c84f7821
> flink "/docker-entrypoint.…" 2 minutes ago Up 2 minutes 6121-6123/tcp,
> 8081/tcp _taskmanager_1 09b19142d70a flink "/docker-entrypoint.…" 9
> minutes ago Up 9 minutes 6123/tcp, 0.0.0.0:8081->8081/tcp
> _jobmanager_1 4ac01eb11bf7 debezium/example-mysql
> "docker-entrypoint.s…" 3 days ago Up 9 minutes 0.0.0.0:3306->3306/tcp,
> 33060/tcp keras-flask-dep |
>
> more info:
>
> *my current flink environment* in docker is flink:scala_2.12-java8
>
> |docker pull flink:scala_2.12-java8 |
>
> *pyflink jdbc connector* is flink-connector-jdbc_2.11-1.11.2.jar from
> flink 1.11 version.
>
> |https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/jdbc.html
> |
>
> in order to use the jdbc library, I tried two ways
>
> 1.
>
>     save the flink-connector-jdbc_2.11-1.11.2.jar into
>     /usr/local/lib/python3.7/site-packages/pyflink/lib
>
> 2.
>
>     configure the classpath in the python app
>
>     |base_dir = "/Users/huhu/Documents/projects/webapp/libs/"
>     flink_jdbc_jar =
>     f"file://{base_dir}flink-connector-jdbc_2.11-1.11.2.jar"
>     BT_ENV.get_config().get_configuration().set_string("pipeline.jars",jars)
>     |
>
> but still getting the same error
>