You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by Zhou Zach <wa...@163.com> on 2020/06/10 08:32:22 UTC

sink mysql 失败

SLF4J: Class path contains multiple SLF4J bindings.

SLF4J: Found binding in [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: Found binding in [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]

SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.

SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]

ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.

Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.

Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.

Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.

Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.

Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.

Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.

Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.

Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.

Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: c53425963e2c88e84a2e699732565e1a)

at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)

at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)

at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1640)

at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)

at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)

at org.apache.flink.table.planner.delegation.StreamExecutor.execute(StreamExecutor.java:42)

at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643)

at org.rabbit.streaming.kafka.FromKafkaSinkJdbcByJDBCAppendTableSink$.main(FromKafkaSinkJdbcByJDBCAppendTableSink.scala:74)

at org.rabbit.streaming.kafka.FromKafkaSinkJdbcByJDBCAppendTableSink.main(FromKafkaSinkJdbcByJDBCAppendTableSink.scala)

Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: c53425963e2c88e84a2e699732565e1a)

at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)

at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)

at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)

at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)

at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)

at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874)

at akka.dispatch.OnComplete.internal(Future.scala:264)

at akka.dispatch.OnComplete.internal(Future.scala:261)

at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)

at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)

at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)

at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)

at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)

at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)

at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)

at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)

at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)

at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)

at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)

at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)

at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)

at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)

at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)

at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)

at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)

at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)

at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)

at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)

at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.

at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)

at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)

... 31 more

Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy

at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)

at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)

at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)

at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)

at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)

at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)

at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)

at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)

at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)

at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)

at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)

at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)

at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)

at akka.actor.Actor$class.aroundReceive(Actor.scala:517)

at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)

at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)

at akka.actor.ActorCell.invoke(ActorCell.scala:561)

at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)

at akka.dispatch.Mailbox.run(Mailbox.scala:225)

at akka.dispatch.Mailbox.exec(Mailbox.scala:235)

... 4 more

Caused by: java.lang.IllegalArgumentException: open() failed.

at org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.open(JDBCOutputFormat.java:74)

at org.apache.flink.api.java.io.jdbc.JDBCSinkFunction.open(JDBCSinkFunction.java:55)

at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)

at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)

at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)

at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)

at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)

at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)

at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)

at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)

at java.lang.Thread.run(Thread.java:745)

Caused by: java.sql.SQLException: Access denied for user ''@'localhost' (using password: NO)

at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965)

at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3976)

at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3912)

at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:871)

at com.mysql.jdbc.MysqlIO.proceedHandshakeWithPluggableAuthentication(MysqlIO.java:1714)

at com.mysql.jdbc.MysqlIO.doHandshake(MysqlIO.java:1224)

at com.mysql.jdbc.ConnectionImpl.coreConnect(ConnectionImpl.java:2190)

at com.mysql.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:2221)

at com.mysql.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:2016)

at com.mysql.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:776)

at com.mysql.jdbc.JDBC4Connection.<init>(JDBC4Connection.java:47)

at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)

at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)

at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)

at java.lang.reflect.Constructor.newInstance(Constructor.java:423)

at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)

at com.mysql.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:386)

at com.mysql.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:330)

at java.sql.DriverManager.getConnection(DriverManager.java:664)

at java.sql.DriverManager.getConnection(DriverManager.java:270)

at org.apache.flink.api.java.io.jdbc.AbstractJDBCOutputFormat.establishConnection(AbstractJDBCOutputFormat.java:68)

at org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.open(JDBCOutputFormat.java:71)

... 12 more




query:
    val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tableEnv = TableEnvironment.create(settings)

        tableEnv.sqlUpdate(
"""
            |
            |CREATE TABLE user_log (
            |    id INT
            |) WITH (
            |    'connector.type' = 'kafka',
            |    'connector.version' = 'universal',
            |    'connector.topic' = 'num',
            |    'connector.startup-mode' = 'latest-offset',
            |    'connector.properties.0.key' = 'zookeeper.connect',
            |    'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181',
            |    'connector.properties.1.key' = 'bootstrap.servers',
            |    'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092',
            |    'update-mode' = 'append',
            |    'format.type' = 'json',
            |    'format.derive-schema' = 'true'
            |)
            |""".stripMargin)

val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder()
      .setDrivername("com.mysql.jdbc.Driver")
      .setDBUrl("jdbc:mysql://localhost:3306/dashboard")
       .setQuery("INSERT INTO t2(id) VALUES (?)")
.setParameterTypes(BasicTypeInfo.INT_TYPE_INFO)
      .build()

    tableEnv.registerTableSink(
"jdbcOutputTable",
// specify table schema
Array[String]("id"),
Array[TypeInformation[_]](Types.INT),
sink)

    tableEnv.sqlQuery("select id from user_log")
      .insertInto("jdbcOutputTable")

    tableEnv.execute("from kafka sink mysql")

Re: sink mysql 失败

Posted by 李奇 <35...@qq.com>.
用户名密码没有设置。

> 在 2020年6月10日,下午5:42,Zhou Zach <wa...@163.com> 写道:
> 
> 感谢回复!忘记设置用户名和密码了。。
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> At 2020-06-10 16:54:43, "wangweiguang@stevegame.cn" <wa...@stevegame.cn> wrote:
>> 
>> Caused by: java.sql.SQLException: Access denied for user ''@'localhost' (using password: NO)
>> 得指定下有操作mysql这个表的权限账号了!
>> 
>> 
>> 
>> 发件人: Zhou Zach
>> 发送时间: 2020-06-10 16:32
>> 收件人: Flink user-zh mailing list
>> 主题: sink mysql 失败
>> SLF4J: Class path contains multiple SLF4J bindings.
>> 
>> SLF4J: Found binding in [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> 
>> SLF4J: Found binding in [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>> 
>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
>> 
>> SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
>> 
>> ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
>> 
>> Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
>> 
>> Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
>> 
>> Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
>> 
>> Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
>> 
>> Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
>> 
>> Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
>> 
>> Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
>> 
>> Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
>> 
>> Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: c53425963e2c88e84a2e699732565e1a)
>> 
>> at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>> 
>> at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>> 
>> at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1640)
>> 
>> at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
>> 
>> at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
>> 
>> at org.apache.flink.table.planner.delegation.StreamExecutor.execute(StreamExecutor.java:42)
>> 
>> at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643)
>> 
>> at org.rabbit.streaming.kafka.FromKafkaSinkJdbcByJDBCAppendTableSink$.main(FromKafkaSinkJdbcByJDBCAppendTableSink.scala:74)
>> 
>> at org.rabbit.streaming.kafka.FromKafkaSinkJdbcByJDBCAppendTableSink.main(FromKafkaSinkJdbcByJDBCAppendTableSink.scala)
>> 
>> Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: c53425963e2c88e84a2e699732565e1a)
>> 
>> at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
>> 
>> at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
>> 
>> at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>> 
>> at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>> 
>> at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
>> 
>> at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874)
>> 
>> at akka.dispatch.OnComplete.internal(Future.scala:264)
>> 
>> at akka.dispatch.OnComplete.internal(Future.scala:261)
>> 
>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
>> 
>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
>> 
>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>> 
>> at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
>> 
>> at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>> 
>> at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
>> 
>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
>> 
>> at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
>> 
>> at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
>> 
>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
>> 
>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
>> 
>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>> 
>> at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>> 
>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>> 
>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>> 
>> at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>> 
>> at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>> 
>> at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>> 
>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>> 
>> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>> 
>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> 
>> at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> 
>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> 
>> at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> 
>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
>> 
>> at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
>> 
>> at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)
>> 
>> ... 31 more
>> 
>> Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
>> 
>> at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
>> 
>> at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
>> 
>> at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
>> 
>> at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
>> 
>> at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
>> 
>> at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
>> 
>> at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
>> 
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> 
>> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> 
>> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> 
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> 
>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
>> 
>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
>> 
>> at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>> 
>> at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>> 
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>> 
>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>> 
>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>> 
>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>> 
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>> 
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> 
>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> 
>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>> 
>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>> 
>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>> 
>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>> 
>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>> 
>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>> 
>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>> 
>> ... 4 more
>> 
>> Caused by: java.lang.IllegalArgumentException: open() failed.
>> 
>> at org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.open(JDBCOutputFormat.java:74)
>> 
>> at org.apache.flink.api.java.io.jdbc.JDBCSinkFunction.open(JDBCSinkFunction.java:55)
>> 
>> at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>> 
>> at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>> 
>> at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
>> 
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
>> 
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
>> 
>> at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>> 
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
>> 
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
>> 
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>> 
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>> 
>> at java.lang.Thread.run(Thread.java:745)
>> 
>> Caused by: java.sql.SQLException: Access denied for user ''@'localhost' (using password: NO)
>> 
>> at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965)
>> 
>> at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3976)
>> 
>> at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3912)
>> 
>> at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:871)
>> 
>> at com.mysql.jdbc.MysqlIO.proceedHandshakeWithPluggableAuthentication(MysqlIO.java:1714)
>> 
>> at com.mysql.jdbc.MysqlIO.doHandshake(MysqlIO.java:1224)
>> 
>> at com.mysql.jdbc.ConnectionImpl.coreConnect(ConnectionImpl.java:2190)
>> 
>> at com.mysql.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:2221)
>> 
>> at com.mysql.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:2016)
>> 
>> at com.mysql.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:776)
>> 
>> at com.mysql.jdbc.JDBC4Connection.<init>(JDBC4Connection.java:47)
>> 
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>> 
>> at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>> 
>> at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>> 
>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>> 
>> at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
>> 
>> at com.mysql.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:386)
>> 
>> at com.mysql.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:330)
>> 
>> at java.sql.DriverManager.getConnection(DriverManager.java:664)
>> 
>> at java.sql.DriverManager.getConnection(DriverManager.java:270)
>> 
>> at org.apache.flink.api.java.io.jdbc.AbstractJDBCOutputFormat.establishConnection(AbstractJDBCOutputFormat.java:68)
>> 
>> at org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.open(JDBCOutputFormat.java:71)
>> 
>> ... 12 more
>> 
>> 
>> 
>> 
>> query:
>>   val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>> val tableEnv = TableEnvironment.create(settings)
>> 
>>       tableEnv.sqlUpdate(
>> """
>>           |
>>           |CREATE TABLE user_log (
>>           |    id INT
>>           |) WITH (
>>           |    'connector.type' = 'kafka',
>>           |    'connector.version' = 'universal',
>>           |    'connector.topic' = 'num',
>>           |    'connector.startup-mode' = 'latest-offset',
>>           |    'connector.properties.0.key' = 'zookeeper.connect',
>>           |    'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181',
>>           |    'connector.properties.1.key' = 'bootstrap.servers',
>>           |    'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092',
>>           |    'update-mode' = 'append',
>>           |    'format.type' = 'json',
>>           |    'format.derive-schema' = 'true'
>>           |)
>>           |""".stripMargin)
>> 
>> val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder()
>>     .setDrivername("com.mysql.jdbc.Driver")
>>     .setDBUrl("jdbc:mysql://localhost:3306/dashboard")
>>      .setQuery("INSERT INTO t2(id) VALUES (?)")
>> .setParameterTypes(BasicTypeInfo.INT_TYPE_INFO)
>>     .build()
>> 
>>   tableEnv.registerTableSink(
>> "jdbcOutputTable",
>> // specify table schema
>> Array[String]("id"),
>> Array[TypeInformation[_]](Types.INT),
>> sink)
>> 
>>   tableEnv.sqlQuery("select id from user_log")
>>     .insertInto("jdbcOutputTable")
>> 
>>   tableEnv.execute("from kafka sink mysql")


Re:回复: sink mysql 失败

Posted by Zhou Zach <wa...@163.com>.
感谢回复!忘记设置用户名和密码了。。

















At 2020-06-10 16:54:43, "wangweiguang@stevegame.cn" <wa...@stevegame.cn> wrote:
>
>Caused by: java.sql.SQLException: Access denied for user ''@'localhost' (using password: NO)
>得指定下有操作mysql这个表的权限账号了!
>
>
> 
>发件人: Zhou Zach
>发送时间: 2020-06-10 16:32
>收件人: Flink user-zh mailing list
>主题: sink mysql 失败
>SLF4J: Class path contains multiple SLF4J bindings.
> 
>SLF4J: Found binding in [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> 
>SLF4J: Found binding in [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> 
>SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
> 
>SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
> 
>ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
> 
>Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
> 
>Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
> 
>Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
> 
>Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
> 
>Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
> 
>Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
> 
>Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
> 
>Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
> 
>Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: c53425963e2c88e84a2e699732565e1a)
> 
>at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> 
>at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> 
>at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1640)
> 
>at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
> 
>at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
> 
>at org.apache.flink.table.planner.delegation.StreamExecutor.execute(StreamExecutor.java:42)
> 
>at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643)
> 
>at org.rabbit.streaming.kafka.FromKafkaSinkJdbcByJDBCAppendTableSink$.main(FromKafkaSinkJdbcByJDBCAppendTableSink.scala:74)
> 
>at org.rabbit.streaming.kafka.FromKafkaSinkJdbcByJDBCAppendTableSink.main(FromKafkaSinkJdbcByJDBCAppendTableSink.scala)
> 
>Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: c53425963e2c88e84a2e699732565e1a)
> 
>at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
> 
>at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
> 
>at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
> 
>at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> 
>at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
> 
>at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874)
> 
>at akka.dispatch.OnComplete.internal(Future.scala:264)
> 
>at akka.dispatch.OnComplete.internal(Future.scala:261)
> 
>at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
> 
>at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
> 
>at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> 
>at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
> 
>at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
> 
>at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
> 
>at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
> 
>at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
> 
>at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
> 
>at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
> 
>at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
> 
>at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> 
>at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
> 
>at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
> 
>at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
> 
>at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
> 
>at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
> 
>at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
> 
>at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> 
>at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> 
>at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> 
>at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> 
>at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> 
>at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 
>Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
> 
>at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
> 
>at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)
> 
>... 31 more
> 
>Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
> 
>at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
> 
>at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
> 
>at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
> 
>at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
> 
>at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
> 
>at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
> 
>at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
> 
>at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 
>at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> 
>at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 
>at java.lang.reflect.Method.invoke(Method.java:498)
> 
>at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
> 
>at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
> 
>at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> 
>at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> 
>at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> 
>at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> 
>at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> 
>at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> 
>at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> 
>at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> 
>at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> 
>at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> 
>at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> 
>at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> 
>at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> 
>at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> 
>at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> 
>at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> 
>... 4 more
> 
>Caused by: java.lang.IllegalArgumentException: open() failed.
> 
>at org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.open(JDBCOutputFormat.java:74)
> 
>at org.apache.flink.api.java.io.jdbc.JDBCSinkFunction.open(JDBCSinkFunction.java:55)
> 
>at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> 
>at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
> 
>at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
> 
>at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
> 
>at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
> 
>at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> 
>at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
> 
>at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
> 
>at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> 
>at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> 
>at java.lang.Thread.run(Thread.java:745)
> 
>Caused by: java.sql.SQLException: Access denied for user ''@'localhost' (using password: NO)
> 
>at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965)
> 
>at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3976)
> 
>at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3912)
> 
>at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:871)
> 
>at com.mysql.jdbc.MysqlIO.proceedHandshakeWithPluggableAuthentication(MysqlIO.java:1714)
> 
>at com.mysql.jdbc.MysqlIO.doHandshake(MysqlIO.java:1224)
> 
>at com.mysql.jdbc.ConnectionImpl.coreConnect(ConnectionImpl.java:2190)
> 
>at com.mysql.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:2221)
> 
>at com.mysql.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:2016)
> 
>at com.mysql.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:776)
> 
>at com.mysql.jdbc.JDBC4Connection.<init>(JDBC4Connection.java:47)
> 
>at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> 
>at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> 
>at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> 
>at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> 
>at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
> 
>at com.mysql.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:386)
> 
>at com.mysql.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:330)
> 
>at java.sql.DriverManager.getConnection(DriverManager.java:664)
> 
>at java.sql.DriverManager.getConnection(DriverManager.java:270)
> 
>at org.apache.flink.api.java.io.jdbc.AbstractJDBCOutputFormat.establishConnection(AbstractJDBCOutputFormat.java:68)
> 
>at org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.open(JDBCOutputFormat.java:71)
> 
>... 12 more
> 
> 
> 
> 
>query:
>    val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
>val tableEnv = TableEnvironment.create(settings)
> 
>        tableEnv.sqlUpdate(
>"""
>            |
>            |CREATE TABLE user_log (
>            |    id INT
>            |) WITH (
>            |    'connector.type' = 'kafka',
>            |    'connector.version' = 'universal',
>            |    'connector.topic' = 'num',
>            |    'connector.startup-mode' = 'latest-offset',
>            |    'connector.properties.0.key' = 'zookeeper.connect',
>            |    'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181',
>            |    'connector.properties.1.key' = 'bootstrap.servers',
>            |    'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092',
>            |    'update-mode' = 'append',
>            |    'format.type' = 'json',
>            |    'format.derive-schema' = 'true'
>            |)
>            |""".stripMargin)
> 
>val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder()
>      .setDrivername("com.mysql.jdbc.Driver")
>      .setDBUrl("jdbc:mysql://localhost:3306/dashboard")
>       .setQuery("INSERT INTO t2(id) VALUES (?)")
>.setParameterTypes(BasicTypeInfo.INT_TYPE_INFO)
>      .build()
> 
>    tableEnv.registerTableSink(
>"jdbcOutputTable",
>// specify table schema
>Array[String]("id"),
>Array[TypeInformation[_]](Types.INT),
>sink)
> 
>    tableEnv.sqlQuery("select id from user_log")
>      .insertInto("jdbcOutputTable")
> 
>    tableEnv.execute("from kafka sink mysql")

回复: sink mysql 失败

Posted by "wangweiguang@stevegame.cn" <wa...@stevegame.cn>.
Caused by: java.sql.SQLException: Access denied for user ''@'localhost' (using password: NO)
得指定下有操作mysql这个表的权限账号了!


 
发件人: Zhou Zach
发送时间: 2020-06-10 16:32
收件人: Flink user-zh mailing list
主题: sink mysql 失败
SLF4J: Class path contains multiple SLF4J bindings.
 
SLF4J: Found binding in [jar:file:/Users/Zach/.m2/repository/org/apache/logging/log4j/log4j-slf4j-impl/2.4.1/log4j-slf4j-impl-2.4.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 
SLF4J: Found binding in [jar:file:/Users/Zach/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
 
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
 
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
 
ERROR StatusLogger No log4j2 configuration file found. Using default configuration: logging only errors to the console.
 
Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
 
Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
 
Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
 
Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
 
Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
 
Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
 
Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
 
Wed Jun 10 16:27:09 CST 2020 WARN: Establishing SSL connection without server's identity verification is not recommended. According to MySQL 5.5.45+, 5.6.26+ and 5.7.6+ requirements SSL connection must be established by default if explicit option isn't set. For compliance with existing applications not using SSL the verifyServerCertificate property is set to 'false'. You need either to explicitly disable SSL by setting useSSL=false, or set useSSL=true and provide truststore for server certificate verification.
 
Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: c53425963e2c88e84a2e699732565e1a)
 
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
 
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
 
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1640)
 
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
 
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
 
at org.apache.flink.table.planner.delegation.StreamExecutor.execute(StreamExecutor.java:42)
 
at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643)
 
at org.rabbit.streaming.kafka.FromKafkaSinkJdbcByJDBCAppendTableSink$.main(FromKafkaSinkJdbcByJDBCAppendTableSink.scala:74)
 
at org.rabbit.streaming.kafka.FromKafkaSinkJdbcByJDBCAppendTableSink.main(FromKafkaSinkJdbcByJDBCAppendTableSink.scala)
 
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: c53425963e2c88e84a2e699732565e1a)
 
at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
 
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
 
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
 
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
 
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
 
at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874)
 
at akka.dispatch.OnComplete.internal(Future.scala:264)
 
at akka.dispatch.OnComplete.internal(Future.scala:261)
 
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
 
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
 
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
 
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
 
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
 
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
 
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
 
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
 
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
 
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
 
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
 
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
 
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
 
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
 
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
 
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
 
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
 
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
 
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
 
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
 
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
 
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
 
at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)
 
... 31 more
 
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
 
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
 
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
 
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
 
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
 
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
 
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
 
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 
at java.lang.reflect.Method.invoke(Method.java:498)
 
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
 
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
 
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
 
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
 
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
 
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
 
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
 
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
 
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
 
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
 
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
 
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
 
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
 
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
 
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
 
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
 
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
 
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
 
... 4 more
 
Caused by: java.lang.IllegalArgumentException: open() failed.
 
at org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.open(JDBCOutputFormat.java:74)
 
at org.apache.flink.api.java.io.jdbc.JDBCSinkFunction.open(JDBCSinkFunction.java:55)
 
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
 
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
 
at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
 
at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1007)
 
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
 
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
 
at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
 
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
 
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
 
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
 
at java.lang.Thread.run(Thread.java:745)
 
Caused by: java.sql.SQLException: Access denied for user ''@'localhost' (using password: NO)
 
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965)
 
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3976)
 
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3912)
 
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:871)
 
at com.mysql.jdbc.MysqlIO.proceedHandshakeWithPluggableAuthentication(MysqlIO.java:1714)
 
at com.mysql.jdbc.MysqlIO.doHandshake(MysqlIO.java:1224)
 
at com.mysql.jdbc.ConnectionImpl.coreConnect(ConnectionImpl.java:2190)
 
at com.mysql.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:2221)
 
at com.mysql.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:2016)
 
at com.mysql.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:776)
 
at com.mysql.jdbc.JDBC4Connection.<init>(JDBC4Connection.java:47)
 
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
 
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
 
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
 
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
 
at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
 
at com.mysql.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:386)
 
at com.mysql.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:330)
 
at java.sql.DriverManager.getConnection(DriverManager.java:664)
 
at java.sql.DriverManager.getConnection(DriverManager.java:270)
 
at org.apache.flink.api.java.io.jdbc.AbstractJDBCOutputFormat.establishConnection(AbstractJDBCOutputFormat.java:68)
 
at org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.open(JDBCOutputFormat.java:71)
 
... 12 more
 
 
 
 
query:
    val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val tableEnv = TableEnvironment.create(settings)
 
        tableEnv.sqlUpdate(
"""
            |
            |CREATE TABLE user_log (
            |    id INT
            |) WITH (
            |    'connector.type' = 'kafka',
            |    'connector.version' = 'universal',
            |    'connector.topic' = 'num',
            |    'connector.startup-mode' = 'latest-offset',
            |    'connector.properties.0.key' = 'zookeeper.connect',
            |    'connector.properties.0.value' = 'cdh1:2181,cdh2:2181,cdh3:2181',
            |    'connector.properties.1.key' = 'bootstrap.servers',
            |    'connector.properties.1.value' = 'cdh1:9092,cdh2:9092,cdh3:9092',
            |    'update-mode' = 'append',
            |    'format.type' = 'json',
            |    'format.derive-schema' = 'true'
            |)
            |""".stripMargin)
 
val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder()
      .setDrivername("com.mysql.jdbc.Driver")
      .setDBUrl("jdbc:mysql://localhost:3306/dashboard")
       .setQuery("INSERT INTO t2(id) VALUES (?)")
.setParameterTypes(BasicTypeInfo.INT_TYPE_INFO)
      .build()
 
    tableEnv.registerTableSink(
"jdbcOutputTable",
// specify table schema
Array[String]("id"),
Array[TypeInformation[_]](Types.INT),
sink)
 
    tableEnv.sqlQuery("select id from user_log")
      .insertInto("jdbcOutputTable")
 
    tableEnv.execute("from kafka sink mysql")