You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by hieule <hi...@gmail.com> on 2020/07/13 08:54:38 UTC

pyflink connect mysql

Hi , 
I has problem when submit job 
```
java.lang.AbstractMethodError: Method
org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.consumeDataSet(Lorg/apache/flink/api/java/DataSet;)Lorg/apache/flink/api/java/operators/DataSink;
is abstract
```


My code :

```
import logging
import os
import shutil
import sys
import tempfile

from pyflink.dataset import ExecutionEnvironment
from pyflink.table import BatchTableEnvironment, TableConfig,TableSink
from pyflink.table.descriptors import FileSystem, OldCsv, Schema
from pyflink.table.types import DataTypes
from pyflink.java_gateway import get_gateway
from pyflink.table.types import _to_java_type
from pyflink.util import utils

class JDBCAppendSink(TableSink):
    
    def __init__(self, field_names: list, field_types: list, driver_name:
str, db_url: str,username: str, password: str, query: str):
        gateway = get_gateway()
        j_field_names = utils.to_jarray(gateway.jvm.String, field_names)
        j_field_types = utils.to_jarray(gateway.jvm.TypeInformation,
                                        [_to_java_type(field_type) for
field_type in field_types])
        builder =
gateway.jvm.org.apache.flink.api.java.io.jdbc.JDBCAppendTableSinkBuilder()
        builder.setUsername(username)
        builder.setPassword(password)
        builder.setDrivername(driver_name)
        builder.setDBUrl(db_url)
        builder.setParameterTypes(j_field_types)
        builder.setQuery(query)
        j_jdbc_sink = builder.build()
        j_jdbc_sink = j_jdbc_sink.configure(j_field_names, j_field_types)
        super(JDBCAppendSink, self).__init__(j_jdbc_sink)

def word_count():
    t_config = TableConfig()
    env = ExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)
    t_env = BatchTableEnvironment.create(env, t_config)

    # register Results table in table environment
    tmp_dir = tempfile.gettempdir()
    result_path = tmp_dir + '/result'
    source_path = "/home/hieulm/code/data/table_orders.csv"
    if os.path.exists(result_path):
        try:
            if os.path.isfile(result_path):
                os.remove(result_path)
            else:
                shutil.rmtree(result_path)
        except OSError as e:
            logging.error("Error removing directory: %s - %s.", e.filename,
e.strerror)

    logging.info("Read file source CSV: %s", source_path)
    t_env.connect(FileSystem().path(source_path)) \
            .with_format(OldCsv()
                        .field_delimiter(',')
                        .field("a", DataTypes.STRING())
                        .field("b", DataTypes.BIGINT())
                        .field("c", DataTypes.BIGINT()) 
                        .field("rowtime", DataTypes.STRING())) \
            .with_schema(Schema()
                        .field("a", DataTypes.STRING())
                        .field("b", DataTypes.BIGINT())
                        .field("c", DataTypes.BIGINT()) 
                        .field("rowtime", DataTypes.STRING())) \
            .create_temporary_table("orders")
    logging.info("Results directory: DB bank_age")
    t_env.register_table_sink("bank_age",
                JDBCAppendSink(
                    ["age", "count_age"],
                    [
                        DataTypes.BIGINT(),
                        DataTypes.BIGINT()],
                    "com.mysql.cj.jdbc.Driver",
                    "jdbc:mysql://localhost/flink",
                    "hieulm",
                    "Csda@123",
                    "insert into bank_age (age, count_age) values (?, ?)"
                ))

    t_env.scan("orders") \
        .group_by("b")\
        .select("b as age, count(b) as count_age ") \
        .insert_into("bank_age")

    t_env.execute("bank_count_age")


if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO,
format="%(message)s")

    word_count()
```

How to solve problem ?


thank 
hieule




--
Sent from: http://apache-flink.147419.n8.nabble.com/