You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by hieule <hi...@gmail.com> on 2020/07/13 08:54:38 UTC
pyflink connect mysql
Hi ,
I has problem when submit job
```
java.lang.AbstractMethodError: Method
org/apache/flink/api/java/io/jdbc/JDBCAppendTableSink.consumeDataSet(Lorg/apache/flink/api/java/DataSet;)Lorg/apache/flink/api/java/operators/DataSink;
is abstract
```
My code :
```
import logging
import os
import shutil
import sys
import tempfile
from pyflink.dataset import ExecutionEnvironment
from pyflink.table import BatchTableEnvironment, TableConfig,TableSink
from pyflink.table.descriptors import FileSystem, OldCsv, Schema
from pyflink.table.types import DataTypes
from pyflink.java_gateway import get_gateway
from pyflink.table.types import _to_java_type
from pyflink.util import utils
class JDBCAppendSink(TableSink):
def __init__(self, field_names: list, field_types: list, driver_name:
str, db_url: str,username: str, password: str, query: str):
gateway = get_gateway()
j_field_names = utils.to_jarray(gateway.jvm.String, field_names)
j_field_types = utils.to_jarray(gateway.jvm.TypeInformation,
[_to_java_type(field_type) for
field_type in field_types])
builder =
gateway.jvm.org.apache.flink.api.java.io.jdbc.JDBCAppendTableSinkBuilder()
builder.setUsername(username)
builder.setPassword(password)
builder.setDrivername(driver_name)
builder.setDBUrl(db_url)
builder.setParameterTypes(j_field_types)
builder.setQuery(query)
j_jdbc_sink = builder.build()
j_jdbc_sink = j_jdbc_sink.configure(j_field_names, j_field_types)
super(JDBCAppendSink, self).__init__(j_jdbc_sink)
def word_count():
t_config = TableConfig()
env = ExecutionEnvironment.get_execution_environment()
env.set_parallelism(1)
t_env = BatchTableEnvironment.create(env, t_config)
# register Results table in table environment
tmp_dir = tempfile.gettempdir()
result_path = tmp_dir + '/result'
source_path = "/home/hieulm/code/data/table_orders.csv"
if os.path.exists(result_path):
try:
if os.path.isfile(result_path):
os.remove(result_path)
else:
shutil.rmtree(result_path)
except OSError as e:
logging.error("Error removing directory: %s - %s.", e.filename,
e.strerror)
logging.info("Read file source CSV: %s", source_path)
t_env.connect(FileSystem().path(source_path)) \
.with_format(OldCsv()
.field_delimiter(',')
.field("a", DataTypes.STRING())
.field("b", DataTypes.BIGINT())
.field("c", DataTypes.BIGINT())
.field("rowtime", DataTypes.STRING())) \
.with_schema(Schema()
.field("a", DataTypes.STRING())
.field("b", DataTypes.BIGINT())
.field("c", DataTypes.BIGINT())
.field("rowtime", DataTypes.STRING())) \
.create_temporary_table("orders")
logging.info("Results directory: DB bank_age")
t_env.register_table_sink("bank_age",
JDBCAppendSink(
["age", "count_age"],
[
DataTypes.BIGINT(),
DataTypes.BIGINT()],
"com.mysql.cj.jdbc.Driver",
"jdbc:mysql://localhost/flink",
"hieulm",
"Csda@123",
"insert into bank_age (age, count_age) values (?, ?)"
))
t_env.scan("orders") \
.group_by("b")\
.select("b as age, count(b) as count_age ") \
.insert_into("bank_age")
t_env.execute("bank_count_age")
if __name__ == '__main__':
logging.basicConfig(stream=sys.stdout, level=logging.INFO,
format="%(message)s")
word_count()
```
How to solve problem ?
thank
hieule
--
Sent from: http://apache-flink.147419.n8.nabble.com/