You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by yuess_coder <64...@qq.com> on 2019/03/07 05:31:46 UTC
回复: sql-client batch 模式执行报错
错误日志是
java.lang.NullPointerException
at org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:1300)
at org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:1203)
at org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:1235)
at org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$applyUpdate$10(LocalExecutor.java:605)
at org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:181)
at org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:603)
at org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:508)
at org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:342)
at org.apache.flink.table.client.cli.CliClient.callInsertInto(CliClient.java:483)
at org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:286)
at java.util.Optional.ifPresent(Optional.java:159)
at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:174)
at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
at org.apache.flink.table.client.SqlClient.start(SqlClient.java:107)
at org.apache.flink.table.client.SqlClient.main(SqlClient.java:202)
val sinkFieldTypes = tableSink
.getFieldTypes
.map(_.toInternalType)
这里报的null。多谢回答。
------------------ 原始邮件 ------------------
发件人: "Kurt Young"<yk...@gmail.com>;
发送时间: 2019年3月7日(星期四) 中午12:06
收件人: "user-zh"<us...@flink.apache.org>;
主题: Re: sql-client batch 模式执行报错
你好,看不到图,能贴成文本吗?
Best,
Kurt
On Thu, Mar 7, 2019 at 9:23 AM yuess_coder <64...@qq.com> wrote:
> 我在sql-client提交任务:
>
> create table csv_source1(
> id varchar,
> name varchar
> ) with (
> type ='csv',
> path = '/Users/IdeaProjects/github/apache-flink/build-target/bin/test1.csv'
> );
>
>
> create table csv_sink(
> id varchar,
> name varchar
> ) with (
> type ='csv',
> path = '/Users/IdeaProjects/github/apache-flink/build-target/bin/test4.csv'
> );
>
> insert into csv_sink select t1.name,t1.id from csv_source1 t1
>
>
> 错误是org.apache.flink.table.api.TableEnvironment这个类1300行空指针,用execution
> batch模式不行,用execution streaming模式是可以的。请问下才能batch模式执行这个sql?
>
>
>
Re: sql-client batch 模式执行报错
Posted by Zhenghua Gao <do...@gmail.com>.
我Debug了一下,目前Batch模式下的确会有问题,问题在于CsvTableFactory实现了
BatchCompatibleTableSinkFactory 而不是 BatchTableSinkFactory。
/**
* A CSV table factory.
*/
public class CsvTableFactory implements
StreamTableSourceFactory<BaseRow>,
BatchTableSourceFactory<BaseRow>,
StreamTableSinkFactory<Object>,
BatchCompatibleTableSinkFactory<Object> {
而在 ExternalTableUtil.scala(line 111) 中使用Java SPI 寻找对应的Factory时,我们传入的范型参数是
BatchTableSinkFactory。
/**
* Converts an [[CatalogTable]] instance to a [[TableSink]] instance
*
* @param name name of the table
* @param externalTable the [[CatalogTable]] instance to convert
* @param isStreaming is in streaming mode or not.
* @return
*/
def toTableSink(
name: String,
externalTable: CatalogTable,
isStreaming: Boolean): TableSink[_] = {
val tableProperties: TableProperties = generateTableProperties(name,
externalTable, isStreaming)
if (isStreaming) {
val tableFactory =
TableFactoryService.find(classOf[StreamTableSinkFactory[_]],
getToolDescriptor(getStorageType(name, tableProperties), tableProperties))
tableFactory.createStreamTableSink(tableProperties.toKeyLowerCase.toMap)
} else {
val tableFactory =
TableFactoryService.find(classOf[BatchTableSinkFactory[_]],
getToolDescriptor(getStorageType(name, tableProperties), tableProperties))
tableFactory.createBatchTableSink(tableProperties.toKeyLowerCase.toMap)
}
}
这样会导致找不到我们想要的 CsvTableFactory。进而会报一个 NoMatchingTableFactoryException的异常。
我们引入 BatchCompatibleTableSinkFactory
的初衷是想复用部分connector的代码(批复用流),但现在看来还有些问题没考虑到。我们正在考虑如何修正。这里一个快速的fix是 参照
TableFactoryUtil(line97-108)的做法,在找不到 BatchTableSinkFactory 是,用
BatchCompatibleTableSinkFactory 再找一轮。更优雅的修复请等待我们的patch。
至于为什么你这里是抛一个NPE,可能需要你远程调试一下,我不确定你的代码是否修改过。
On Thu, Mar 7, 2019 at 5:28 PM yuess_coder <64...@qq.com> wrote:
> 只有红色部分修改了。帮忙看下其他有没有问题,再不行的话只能debug了。
>
>
> # Define tables here such as sources, sinks, views, or temporal tables.
>
>
> tables: [] # empty list
>
>
> # Define scalar, aggregate, or table functions here.
>
>
> functions: [] # empty list
>
>
>
>
> # Execution properties allow for changing the behavior of a table program.
>
>
> execution:
> # 'batch' or 'streaming' execution
> type: batch
> # allow 'event-time' or only 'processing-time' in sources
> time-characteristic: event-time
> # interval in ms for emitting periodic watermarks
> periodic-watermarks-interval: 200
> # 'changelog' or 'table' presentation of results
> result-mode: table
> # maximum number of maintained rows in 'table' presentation of results
> max-table-result-rows: 1000000
> # parallelism of the program
> parallelism: 1
> # maximum parallelism
> max-parallelism: 128
> # minimum idle state retention in ms
> min-idle-state-retention: 0
> # maximum idle state retention in ms
> max-idle-state-retention: 0
> # controls how table programs are restarted in case of a failures
> restart-strategy:
> # strategy type
> # possible values are "fixed-delay", "failure-rate", "none", or
> "fallback" (default)
> type: fallback
>
>
>
>
>
> #==============================================================================
> # Deployment properties
>
> #==============================================================================
>
>
> # Deployment properties allow for describing the cluster to which table
> # programs are submitted to.
>
>
> deployment:
> # general cluster communication timeout in ms
> response-timeout: 5000
> # (optional) address from cluster to gateway
> gateway-address: ""
> # (optional) port from cluster to gateway
> gateway-port: 0
>
>
>
>
>
> #==============================================================================
> # Catalog properties
>
> #==============================================================================
> #catalogs:
> # - name: myhive
> # catalog:
> # type: hive
> # connector:
> # hive.metastore.uris: thrift://localhost:9083
> # is-default: false
> # default-database: default
>
>
>
> ------------------ 原始邮件 ------------------
> 发件人: "Zhenghua Gao"<do...@gmail.com>;
> 发送时间: 2019年3月7日(星期四) 下午3:13
> 收件人: "user-zh"<us...@flink.apache.org>;
>
> 主题: Re: sql-client batch 模式执行报错
>
>
>
> 看起来 tableSink 由于什么原因变成null了。需要debug一下。
> 你能提供一下你的 conf/sql-client-defaults.yaml 吗?
>
> On Thu, Mar 7, 2019 at 1:31 PM yuess_coder <64...@qq.com> wrote:
>
> > 错误日志是
> >
> >
> >
> >
> > java.lang.NullPointerException
> > at
> >
> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:1300)
> > at
> >
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:1203)
> > at
> >
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:1235)
> > at
> >
> org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$applyUpdate$10(LocalExecutor.java:605)
> > at
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:181)
> > at
> >
> org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:603)
> > at
> >
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:508)
> > at
> >
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:342)
> > at
> >
> org.apache.flink.table.client.cli.CliClient.callInsertInto(CliClient.java:483)
> > at
> >
> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:286)
> > at java.util.Optional.ifPresent(Optional.java:159)
> > at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:174)
> > at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
> > at org.apache.flink.table.client.SqlClient.start(SqlClient.java:107)
> > at org.apache.flink.table.client.SqlClient.main(SqlClient.java:202)
> >
> >
> >
> >
> >
> > val sinkFieldTypes = tableSink
> > .getFieldTypes
> > .map(_.toInternalType)
> >
> > 这里报的null。多谢回答。
> >
> >
> > ------------------ 原始邮件 ------------------
> > 发件人: "Kurt Young"<yk...@gmail.com>;
> > 发送时间: 2019年3月7日(星期四) 中午12:06
> > 收件人: "user-zh"<us...@flink.apache.org>;
> >
> > 主题: Re: sql-client batch 模式执行报错
> >
> >
> >
> > 你好,看不到图,能贴成文本吗?
> >
> > Best,
> > Kurt
> >
> >
> > On Thu, Mar 7, 2019 at 9:23 AM yuess_coder <64...@qq.com> wrote:
> >
> > > 我在sql-client提交任务:
> > >
> > > create table csv_source1(
> > > id varchar,
> > > name varchar
> > > ) with (
> > > type ='csv',
> > > path =
> > '/Users/IdeaProjects/github/apache-flink/build-target/bin/test1.csv'
> > > );
> > >
> > >
> > > create table csv_sink(
> > > id varchar,
> > > name varchar
> > > ) with (
> > > type ='csv',
> > > path =
> > '/Users/IdeaProjects/github/apache-flink/build-target/bin/test4.csv'
> > > );
> > >
> > > insert into csv_sink select t1.name,t1.id from csv_source1 t1
> > >
> > >
> > > 错误是org.apache.flink.table.api.TableEnvironment这个类1300行空指针,用execution
> > > batch模式不行,用execution streaming模式是可以的。请问下才能batch模式执行这个sql?
> > >
> > >
> > >
回复: sql-client batch 模式执行报错
Posted by yuess_coder <64...@qq.com>.
只有红色部分修改了。帮忙看下其他有没有问题,再不行的话只能debug了。
# Define tables here such as sources, sinks, views, or temporal tables.
tables: [] # empty list
# Define scalar, aggregate, or table functions here.
functions: [] # empty list
# Execution properties allow for changing the behavior of a table program.
execution:
# 'batch' or 'streaming' execution
type: batch
# allow 'event-time' or only 'processing-time' in sources
time-characteristic: event-time
# interval in ms for emitting periodic watermarks
periodic-watermarks-interval: 200
# 'changelog' or 'table' presentation of results
result-mode: table
# maximum number of maintained rows in 'table' presentation of results
max-table-result-rows: 1000000
# parallelism of the program
parallelism: 1
# maximum parallelism
max-parallelism: 128
# minimum idle state retention in ms
min-idle-state-retention: 0
# maximum idle state retention in ms
max-idle-state-retention: 0
# controls how table programs are restarted in case of a failures
restart-strategy:
# strategy type
# possible values are "fixed-delay", "failure-rate", "none", or "fallback" (default)
type: fallback
#==============================================================================
# Deployment properties
#==============================================================================
# Deployment properties allow for describing the cluster to which table
# programs are submitted to.
deployment:
# general cluster communication timeout in ms
response-timeout: 5000
# (optional) address from cluster to gateway
gateway-address: ""
# (optional) port from cluster to gateway
gateway-port: 0
#==============================================================================
# Catalog properties
#==============================================================================
#catalogs:
# - name: myhive
# catalog:
# type: hive
# connector:
# hive.metastore.uris: thrift://localhost:9083
# is-default: false
# default-database: default
------------------ 原始邮件 ------------------
发件人: "Zhenghua Gao"<do...@gmail.com>;
发送时间: 2019年3月7日(星期四) 下午3:13
收件人: "user-zh"<us...@flink.apache.org>;
主题: Re: sql-client batch 模式执行报错
看起来 tableSink 由于什么原因变成null了。需要debug一下。
你能提供一下你的 conf/sql-client-defaults.yaml 吗?
On Thu, Mar 7, 2019 at 1:31 PM yuess_coder <64...@qq.com> wrote:
> 错误日志是
>
>
>
>
> java.lang.NullPointerException
> at
> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:1300)
> at
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:1203)
> at
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:1235)
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$applyUpdate$10(LocalExecutor.java:605)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:181)
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:603)
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:508)
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:342)
> at
> org.apache.flink.table.client.cli.CliClient.callInsertInto(CliClient.java:483)
> at
> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:286)
> at java.util.Optional.ifPresent(Optional.java:159)
> at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:174)
> at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:107)
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:202)
>
>
>
>
>
> val sinkFieldTypes = tableSink
> .getFieldTypes
> .map(_.toInternalType)
>
> 这里报的null。多谢回答。
>
>
> ------------------ 原始邮件 ------------------
> 发件人: "Kurt Young"<yk...@gmail.com>;
> 发送时间: 2019年3月7日(星期四) 中午12:06
> 收件人: "user-zh"<us...@flink.apache.org>;
>
> 主题: Re: sql-client batch 模式执行报错
>
>
>
> 你好,看不到图,能贴成文本吗?
>
> Best,
> Kurt
>
>
> On Thu, Mar 7, 2019 at 9:23 AM yuess_coder <64...@qq.com> wrote:
>
> > 我在sql-client提交任务:
> >
> > create table csv_source1(
> > id varchar,
> > name varchar
> > ) with (
> > type ='csv',
> > path =
> '/Users/IdeaProjects/github/apache-flink/build-target/bin/test1.csv'
> > );
> >
> >
> > create table csv_sink(
> > id varchar,
> > name varchar
> > ) with (
> > type ='csv',
> > path =
> '/Users/IdeaProjects/github/apache-flink/build-target/bin/test4.csv'
> > );
> >
> > insert into csv_sink select t1.name,t1.id from csv_source1 t1
> >
> >
> > 错误是org.apache.flink.table.api.TableEnvironment这个类1300行空指针,用execution
> > batch模式不行,用execution streaming模式是可以的。请问下才能batch模式执行这个sql?
> >
> >
> >
Re: sql-client batch 模式执行报错
Posted by Zhenghua Gao <do...@gmail.com>.
看起来 tableSink 由于什么原因变成null了。需要debug一下。
你能提供一下你的 conf/sql-client-defaults.yaml 吗?
On Thu, Mar 7, 2019 at 1:31 PM yuess_coder <64...@qq.com> wrote:
> 错误日志是
>
>
>
>
> java.lang.NullPointerException
> at
> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:1300)
> at
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:1203)
> at
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:1235)
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$applyUpdate$10(LocalExecutor.java:605)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:181)
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:603)
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:508)
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:342)
> at
> org.apache.flink.table.client.cli.CliClient.callInsertInto(CliClient.java:483)
> at
> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:286)
> at java.util.Optional.ifPresent(Optional.java:159)
> at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:174)
> at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:107)
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:202)
>
>
>
>
>
> val sinkFieldTypes = tableSink
> .getFieldTypes
> .map(_.toInternalType)
>
> 这里报的null。多谢回答。
>
>
> ------------------ 原始邮件 ------------------
> 发件人: "Kurt Young"<yk...@gmail.com>;
> 发送时间: 2019年3月7日(星期四) 中午12:06
> 收件人: "user-zh"<us...@flink.apache.org>;
>
> 主题: Re: sql-client batch 模式执行报错
>
>
>
> 你好,看不到图,能贴成文本吗?
>
> Best,
> Kurt
>
>
> On Thu, Mar 7, 2019 at 9:23 AM yuess_coder <64...@qq.com> wrote:
>
> > 我在sql-client提交任务:
> >
> > create table csv_source1(
> > id varchar,
> > name varchar
> > ) with (
> > type ='csv',
> > path =
> '/Users/IdeaProjects/github/apache-flink/build-target/bin/test1.csv'
> > );
> >
> >
> > create table csv_sink(
> > id varchar,
> > name varchar
> > ) with (
> > type ='csv',
> > path =
> '/Users/IdeaProjects/github/apache-flink/build-target/bin/test4.csv'
> > );
> >
> > insert into csv_sink select t1.name,t1.id from csv_source1 t1
> >
> >
> > 错误是org.apache.flink.table.api.TableEnvironment这个类1300行空指针,用execution
> > batch模式不行,用execution streaming模式是可以的。请问下才能batch模式执行这个sql?
> >
> >
> >