You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Jark Wu (Jira)" <ji...@apache.org> on 2020/01/02 12:25:00 UTC

[jira] [Resolved] (FLINK-15383) Using sink Schema field name instead of query Schema field name in UpsertStreamTableSink

     [ https://issues.apache.org/jira/browse/FLINK-15383?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Jark Wu resolved FLINK-15383.
-----------------------------
    Resolution: Fixed

1.11.0: 3843e167a9bd47d891e82d12e8ae609185ebfbfe
1.10.0: 4a63854bc054c859b94d054d82371d100e3e5ab8

> Using sink Schema field name instead of query Schema field name in UpsertStreamTableSink
> ----------------------------------------------------------------------------------------
>
>                 Key: FLINK-15383
>                 URL: https://issues.apache.org/jira/browse/FLINK-15383
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Legacy Planner, Table SQL / Planner
>    Affects Versions: 1.10.0
>            Reporter: Leonard Xu
>            Assignee: Leonard Xu
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.10.0
>
>          Time Spent: 50m
>  Remaining Estimate: 0h
>
> when user  define a upsert table with primary key underlying(eg. mysql) 
> {code:java}
> // primary key (log_per_min, item, currency_time)
> String sinkTableDDL =  "CREATE TABLE gmv (\n" +
>         "  log_per_min STRING,\n" +
>         "  item STRING,\n" +
>         "  order_cnt BIGINT,\n" +
>         "  currency_time TIMESTAMP(3),\n" +
>         "  gmv DECIMAL(38, 18)," +
>         "  timestamp9 TIMESTAMP(6),\n" +
>         "  time9 TIME(6),\n" +
>         "  gdp DECIMAL(8, 4)\n" +
>          ") WITH (\n" +
>         "   'connector.type' = 'jdbc',\n" +
>         "   'connector.url' = 'jdbc:mysql://localhost:3306/test',\n" +
>         "   'connector.username' = 'root'," +
>         "   'connector.table' = 'gmv',\n" +
>         "   'connector.driver' = 'com.mysql.jdbc.Driver',\n" +
>         "   'connector.write.flush.max-rows' = '5000', \n" +
>         "   'connector.write.flush.interval' = '2s', \n" +
>         "   'connector.write.max-retries' = '3'" +
>         ")";
> {code}
> If user‘s query field name is different with sinktable's field name. For example, user defined a field `log_ts` which not equals `log_per_min` as following:
> {code:java}
> // 
> insert into gmv \n" +
>         "select log_ts,\n" +
>         " item, COUNT(order_id) as order_cnt, currency_time, cast(sum(amount_kg) * max(rate) as DECIMAL(38, 4))  as gmv,\n" +
>         " max(timestamp9), max(time9), max(gdp) \n" +
>         " from ( \n" +
>         " select cast(o.ts as VARCHAR) as log_ts, o.item as item, o.order_id as order_id, c.currency_time as currency_time,\n" +
>         " o.amount_kg as amount_kg, c.rate as rate, c.timestamp9 as timestamp9, c.time9 as time9, c.gdp as gdp \n" +
>         " from orders as o \n" +
>         " join currency FOR SYSTEM_TIME AS OF o.proc_time c \n" +
>         " on o.currency = c.currency_name \n" +
>         " ) a group by log_ts, item, currency_time
> {code}
>  The query will execute fail:
> {code:java}
> // 
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1Caused by: java.lang.ArrayIndexOutOfBoundsException: -1 at org.apache.flink.api.java.io.jdbc.writer.UpsertWriter.lambda$create$0(UpsertWriter.java:59) at java.util.stream.IntPipeline$3$1.accept(IntPipeline.java:233) at java.util.Spliterators$IntArraySpliterator.forEachRemaining(Spliterators.java:1032) at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:545) at java.util.stream.AbstractPipeline.evaluateToArrayNode(AbstractPipeline.java:260) at java.util.stream.IntPipeline.toArray(IntPipeline.java:502) at org.apache.flink.api.java.io.jdbc.writer.UpsertWriter.create(UpsertWriter.java:59) at org.apache.flink.api.java.io.jdbc.JDBCUpsertOutputFormat.open(JDBCUpsertOutputFormat.java:104) at org.apache.flink.api.java.io.jdbc.JDBCUpsertSinkFunction.open(JDBCUpsertSinkFunction.java:42) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48) at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1023) at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:702) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:527) at java.lang.Thread.run(Thread.java:748)
> {code}
> The root cause is that we should use sink Schema field name rather than query Schema field name in StreamExecSink



--
This message was sent by Atlassian Jira
(v8.3.4#803005)