You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user-zh@flink.apache.org by yuess_coder <64...@qq.com> on 2019/03/07 01:23:14 UTC

sql-client batch 模式执行报错

我在sql-client提交任务:


create table csv_source1(
id varchar,
name varchar
) with (
type ='csv',
path = '/Users/IdeaProjects/github/apache-flink/build-target/bin/test1.csv'
);




create table csv_sink(
id varchar,
name varchar
) with (
type ='csv',
path = '/Users/IdeaProjects/github/apache-flink/build-target/bin/test4.csv'
);


insert into csv_sink  select t1.name,t1.id from csv_source1 t1




错误是org.apache.flink.table.api.TableEnvironment这个类1300行空指针,用execution batch模式不行,用execution streaming模式是可以的。请问下才能batch模式执行这个sql?

Re: sql-client batch 模式执行报错

Posted by Zhenghua Gao <do...@gmail.com>.
我Debug了一下,目前Batch模式下的确会有问题,问题在于CsvTableFactory实现了
BatchCompatibleTableSinkFactory 而不是 BatchTableSinkFactory。

/**
 * A CSV table factory.
 */
public class CsvTableFactory implements
   StreamTableSourceFactory<BaseRow>,
   BatchTableSourceFactory<BaseRow>,
   StreamTableSinkFactory<Object>,
   BatchCompatibleTableSinkFactory<Object> {

而在 ExternalTableUtil.scala(line 111) 中使用Java SPI 寻找对应的Factory时,我们传入的范型参数是
BatchTableSinkFactory。

/**
  * Converts an [[CatalogTable]] instance to a [[TableSink]] instance
  *
  * @param name          name of the table
  * @param externalTable the [[CatalogTable]] instance to convert
  * @param isStreaming   is in streaming mode or not.
  * @return
  */
def toTableSink(
                 name: String,
                 externalTable: CatalogTable,
                 isStreaming: Boolean): TableSink[_] = {

  val tableProperties: TableProperties = generateTableProperties(name,
externalTable, isStreaming)
  if (isStreaming) {
    val tableFactory =
TableFactoryService.find(classOf[StreamTableSinkFactory[_]],
      getToolDescriptor(getStorageType(name, tableProperties), tableProperties))
    tableFactory.createStreamTableSink(tableProperties.toKeyLowerCase.toMap)
  } else {
    val tableFactory =
TableFactoryService.find(classOf[BatchTableSinkFactory[_]],
      getToolDescriptor(getStorageType(name, tableProperties), tableProperties))
    tableFactory.createBatchTableSink(tableProperties.toKeyLowerCase.toMap)
  }
}

这样会导致找不到我们想要的 CsvTableFactory。进而会报一个 NoMatchingTableFactoryException的异常。

我们引入 BatchCompatibleTableSinkFactory
的初衷是想复用部分connector的代码(批复用流),但现在看来还有些问题没考虑到。我们正在考虑如何修正。这里一个快速的fix是 参照
TableFactoryUtil(line97-108)的做法,在找不到 BatchTableSinkFactory 是,用
BatchCompatibleTableSinkFactory 再找一轮。更优雅的修复请等待我们的patch。

至于为什么你这里是抛一个NPE,可能需要你远程调试一下,我不确定你的代码是否修改过。

On Thu, Mar 7, 2019 at 5:28 PM yuess_coder <64...@qq.com> wrote:

> 只有红色部分修改了。帮忙看下其他有没有问题,再不行的话只能debug了。
>
>
> # Define tables here such as sources, sinks, views, or temporal tables.
>
>
> tables: [] # empty list
>
>
> # Define scalar, aggregate, or table functions here.
>
>
> functions: [] # empty list
>
>
>
>
> # Execution properties allow for changing the behavior of a table program.
>
>
> execution:
>   # 'batch' or 'streaming' execution
>   type: batch
>   # allow 'event-time' or only 'processing-time' in sources
>   time-characteristic: event-time
>   # interval in ms for emitting periodic watermarks
>   periodic-watermarks-interval: 200
>   # 'changelog' or 'table' presentation of results
>   result-mode: table
>   # maximum number of maintained rows in 'table' presentation of results
>   max-table-result-rows: 1000000
>   # parallelism of the program
>   parallelism: 1
>   # maximum parallelism
>   max-parallelism: 128
>   # minimum idle state retention in ms
>   min-idle-state-retention: 0
>   # maximum idle state retention in ms
>   max-idle-state-retention: 0
>   # controls how table programs are restarted in case of a failures
>   restart-strategy:
>     # strategy type
>     # possible values are "fixed-delay", "failure-rate", "none", or
> "fallback" (default)
>     type: fallback
>
>
>
>
>
> #==============================================================================
> # Deployment properties
>
> #==============================================================================
>
>
> # Deployment properties allow for describing the cluster to which table
> # programs are submitted to.
>
>
> deployment:
>   # general cluster communication timeout in ms
>   response-timeout: 5000
>   # (optional) address from cluster to gateway
>   gateway-address: ""
>   # (optional) port from cluster to gateway
>   gateway-port: 0
>
>
>
>
>
> #==============================================================================
> # Catalog properties
>
> #==============================================================================
> #catalogs:
> #  - name: myhive
> #    catalog:
> #      type: hive
> #      connector:
> #        hive.metastore.uris: thrift://localhost:9083
> #      is-default: false
> #      default-database: default
>
>
>
> ------------------ 原始邮件 ------------------
> 发件人: "Zhenghua Gao"<do...@gmail.com>;
> 发送时间: 2019年3月7日(星期四) 下午3:13
> 收件人: "user-zh"<us...@flink.apache.org>;
>
> 主题: Re: sql-client batch 模式执行报错
>
>
>
> 看起来 tableSink 由于什么原因变成null了。需要debug一下。
> 你能提供一下你的 conf/sql-client-defaults.yaml 吗?
>
> On Thu, Mar 7, 2019 at 1:31 PM yuess_coder <64...@qq.com> wrote:
>
> > 错误日志是
> >
> >
> >
> >
> > java.lang.NullPointerException
> >   at
> >
> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:1300)
> >   at
> >
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:1203)
> >   at
> >
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:1235)
> >   at
> >
> org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$applyUpdate$10(LocalExecutor.java:605)
> >   at
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:181)
> >   at
> >
> org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:603)
> >   at
> >
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:508)
> >   at
> >
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:342)
> >   at
> >
> org.apache.flink.table.client.cli.CliClient.callInsertInto(CliClient.java:483)
> >   at
> >
> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:286)
> >   at java.util.Optional.ifPresent(Optional.java:159)
> >   at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:174)
> >   at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
> >   at org.apache.flink.table.client.SqlClient.start(SqlClient.java:107)
> >   at org.apache.flink.table.client.SqlClient.main(SqlClient.java:202)
> >
> >
> >
> >
> >
> > val sinkFieldTypes = tableSink
> >       .getFieldTypes
> >       .map(_.toInternalType)
> >
> > 这里报的null。多谢回答。
> >
> >
> > ------------------ 原始邮件 ------------------
> > 发件人: "Kurt Young"<yk...@gmail.com>;
> > 发送时间: 2019年3月7日(星期四) 中午12:06
> > 收件人: "user-zh"<us...@flink.apache.org>;
> >
> > 主题: Re: sql-client batch 模式执行报错
> >
> >
> >
> > 你好,看不到图,能贴成文本吗?
> >
> > Best,
> > Kurt
> >
> >
> > On Thu, Mar 7, 2019 at 9:23 AM yuess_coder <64...@qq.com> wrote:
> >
> > > 我在sql-client提交任务:
> > >
> > > create table csv_source1(
> > > id varchar,
> > > name varchar
> > > ) with (
> > > type ='csv',
> > > path =
> > '/Users/IdeaProjects/github/apache-flink/build-target/bin/test1.csv'
> > > );
> > >
> > >
> > > create table csv_sink(
> > > id varchar,
> > > name varchar
> > > ) with (
> > > type ='csv',
> > > path =
> > '/Users/IdeaProjects/github/apache-flink/build-target/bin/test4.csv'
> > > );
> > >
> > > insert into csv_sink  select t1.name,t1.id from csv_source1 t1
> > >
> > >
> > > 错误是org.apache.flink.table.api.TableEnvironment这个类1300行空指针,用execution
> > > batch模式不行,用execution streaming模式是可以的。请问下才能batch模式执行这个sql?
> > >
> > >
> > >

回复: sql-client batch 模式执行报错

Posted by yuess_coder <64...@qq.com>.
只有红色部分修改了。帮忙看下其他有没有问题,再不行的话只能debug了。


# Define tables here such as sources, sinks, views, or temporal tables.


tables: [] # empty list


# Define scalar, aggregate, or table functions here.


functions: [] # empty list




# Execution properties allow for changing the behavior of a table program.


execution:
  # 'batch' or 'streaming' execution
  type: batch
  # allow 'event-time' or only 'processing-time' in sources
  time-characteristic: event-time
  # interval in ms for emitting periodic watermarks
  periodic-watermarks-interval: 200
  # 'changelog' or 'table' presentation of results
  result-mode: table
  # maximum number of maintained rows in 'table' presentation of results
  max-table-result-rows: 1000000
  # parallelism of the program
  parallelism: 1
  # maximum parallelism
  max-parallelism: 128
  # minimum idle state retention in ms
  min-idle-state-retention: 0
  # maximum idle state retention in ms
  max-idle-state-retention: 0
  # controls how table programs are restarted in case of a failures
  restart-strategy:
    # strategy type
    # possible values are "fixed-delay", "failure-rate", "none", or "fallback" (default)
    type: fallback




#==============================================================================
# Deployment properties
#==============================================================================


# Deployment properties allow for describing the cluster to which table
# programs are submitted to.


deployment:
  # general cluster communication timeout in ms
  response-timeout: 5000
  # (optional) address from cluster to gateway
  gateway-address: ""
  # (optional) port from cluster to gateway
  gateway-port: 0




#==============================================================================
# Catalog properties
#==============================================================================
#catalogs:
#  - name: myhive
#    catalog:
#      type: hive
#      connector:
#        hive.metastore.uris: thrift://localhost:9083
#      is-default: false
#      default-database: default



------------------ 原始邮件 ------------------
发件人: "Zhenghua Gao"<do...@gmail.com>;
发送时间: 2019年3月7日(星期四) 下午3:13
收件人: "user-zh"<us...@flink.apache.org>;

主题: Re: sql-client batch 模式执行报错



看起来 tableSink 由于什么原因变成null了。需要debug一下。
你能提供一下你的 conf/sql-client-defaults.yaml 吗?

On Thu, Mar 7, 2019 at 1:31 PM yuess_coder <64...@qq.com> wrote:

> 错误日志是
>
>
>
>
> java.lang.NullPointerException
>   at
> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:1300)
>   at
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:1203)
>   at
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:1235)
>   at
> org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$applyUpdate$10(LocalExecutor.java:605)
>   at
> org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:181)
>   at
> org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:603)
>   at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:508)
>   at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:342)
>   at
> org.apache.flink.table.client.cli.CliClient.callInsertInto(CliClient.java:483)
>   at
> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:286)
>   at java.util.Optional.ifPresent(Optional.java:159)
>   at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:174)
>   at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
>   at org.apache.flink.table.client.SqlClient.start(SqlClient.java:107)
>   at org.apache.flink.table.client.SqlClient.main(SqlClient.java:202)
>
>
>
>
>
> val sinkFieldTypes = tableSink
>       .getFieldTypes
>       .map(_.toInternalType)
>
> 这里报的null。多谢回答。
>
>
> ------------------ 原始邮件 ------------------
> 发件人: "Kurt Young"<yk...@gmail.com>;
> 发送时间: 2019年3月7日(星期四) 中午12:06
> 收件人: "user-zh"<us...@flink.apache.org>;
>
> 主题: Re: sql-client batch 模式执行报错
>
>
>
> 你好,看不到图,能贴成文本吗?
>
> Best,
> Kurt
>
>
> On Thu, Mar 7, 2019 at 9:23 AM yuess_coder <64...@qq.com> wrote:
>
> > 我在sql-client提交任务:
> >
> > create table csv_source1(
> > id varchar,
> > name varchar
> > ) with (
> > type ='csv',
> > path =
> '/Users/IdeaProjects/github/apache-flink/build-target/bin/test1.csv'
> > );
> >
> >
> > create table csv_sink(
> > id varchar,
> > name varchar
> > ) with (
> > type ='csv',
> > path =
> '/Users/IdeaProjects/github/apache-flink/build-target/bin/test4.csv'
> > );
> >
> > insert into csv_sink  select t1.name,t1.id from csv_source1 t1
> >
> >
> > 错误是org.apache.flink.table.api.TableEnvironment这个类1300行空指针,用execution
> > batch模式不行,用execution streaming模式是可以的。请问下才能batch模式执行这个sql?
> >
> >
> >

Re: sql-client batch 模式执行报错

Posted by Zhenghua Gao <do...@gmail.com>.
看起来 tableSink 由于什么原因变成null了。需要debug一下。
你能提供一下你的 conf/sql-client-defaults.yaml 吗?

On Thu, Mar 7, 2019 at 1:31 PM yuess_coder <64...@qq.com> wrote:

> 错误日志是
>
>
>
>
> java.lang.NullPointerException
>   at
> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:1300)
>   at
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:1203)
>   at
> org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:1235)
>   at
> org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$applyUpdate$10(LocalExecutor.java:605)
>   at
> org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:181)
>   at
> org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:603)
>   at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:508)
>   at
> org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:342)
>   at
> org.apache.flink.table.client.cli.CliClient.callInsertInto(CliClient.java:483)
>   at
> org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:286)
>   at java.util.Optional.ifPresent(Optional.java:159)
>   at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:174)
>   at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
>   at org.apache.flink.table.client.SqlClient.start(SqlClient.java:107)
>   at org.apache.flink.table.client.SqlClient.main(SqlClient.java:202)
>
>
>
>
>
> val sinkFieldTypes = tableSink
>       .getFieldTypes
>       .map(_.toInternalType)
>
> 这里报的null。多谢回答。
>
>
> ------------------ 原始邮件 ------------------
> 发件人: "Kurt Young"<yk...@gmail.com>;
> 发送时间: 2019年3月7日(星期四) 中午12:06
> 收件人: "user-zh"<us...@flink.apache.org>;
>
> 主题: Re: sql-client batch 模式执行报错
>
>
>
> 你好,看不到图,能贴成文本吗?
>
> Best,
> Kurt
>
>
> On Thu, Mar 7, 2019 at 9:23 AM yuess_coder <64...@qq.com> wrote:
>
> > 我在sql-client提交任务:
> >
> > create table csv_source1(
> > id varchar,
> > name varchar
> > ) with (
> > type ='csv',
> > path =
> '/Users/IdeaProjects/github/apache-flink/build-target/bin/test1.csv'
> > );
> >
> >
> > create table csv_sink(
> > id varchar,
> > name varchar
> > ) with (
> > type ='csv',
> > path =
> '/Users/IdeaProjects/github/apache-flink/build-target/bin/test4.csv'
> > );
> >
> > insert into csv_sink  select t1.name,t1.id from csv_source1 t1
> >
> >
> > 错误是org.apache.flink.table.api.TableEnvironment这个类1300行空指针,用execution
> > batch模式不行,用execution streaming模式是可以的。请问下才能batch模式执行这个sql?
> >
> >
> >

回复: sql-client batch 模式执行报错

Posted by yuess_coder <64...@qq.com>.
错误日志是




java.lang.NullPointerException
  at org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:1300)
  at org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:1203)
  at org.apache.flink.table.api.TableEnvironment.sqlUpdate(TableEnvironment.scala:1235)
  at org.apache.flink.table.client.gateway.local.LocalExecutor.lambda$applyUpdate$10(LocalExecutor.java:605)
  at org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:181)
  at org.apache.flink.table.client.gateway.local.LocalExecutor.applyUpdate(LocalExecutor.java:603)
  at org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdateInternal(LocalExecutor.java:508)
  at org.apache.flink.table.client.gateway.local.LocalExecutor.executeUpdate(LocalExecutor.java:342)
  at org.apache.flink.table.client.cli.CliClient.callInsertInto(CliClient.java:483)
  at org.apache.flink.table.client.cli.CliClient.callCommand(CliClient.java:286)
  at java.util.Optional.ifPresent(Optional.java:159)
  at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:174)
  at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:125)
  at org.apache.flink.table.client.SqlClient.start(SqlClient.java:107)
  at org.apache.flink.table.client.SqlClient.main(SqlClient.java:202)





val sinkFieldTypes = tableSink
      .getFieldTypes
      .map(_.toInternalType)

这里报的null。多谢回答。


------------------ 原始邮件 ------------------
发件人: "Kurt Young"<yk...@gmail.com>;
发送时间: 2019年3月7日(星期四) 中午12:06
收件人: "user-zh"<us...@flink.apache.org>;

主题: Re: sql-client batch 模式执行报错



你好,看不到图,能贴成文本吗?

Best,
Kurt


On Thu, Mar 7, 2019 at 9:23 AM yuess_coder <64...@qq.com> wrote:

> 我在sql-client提交任务:
>
> create table csv_source1(
> id varchar,
> name varchar
> ) with (
> type ='csv',
> path = '/Users/IdeaProjects/github/apache-flink/build-target/bin/test1.csv'
> );
>
>
> create table csv_sink(
> id varchar,
> name varchar
> ) with (
> type ='csv',
> path = '/Users/IdeaProjects/github/apache-flink/build-target/bin/test4.csv'
> );
>
> insert into csv_sink  select t1.name,t1.id from csv_source1 t1
>
>
> 错误是org.apache.flink.table.api.TableEnvironment这个类1300行空指针,用execution
> batch模式不行,用execution streaming模式是可以的。请问下才能batch模式执行这个sql?
>
>
>

Re: sql-client batch 模式执行报错

Posted by Kurt Young <yk...@gmail.com>.
你好,看不到图,能贴成文本吗?

Best,
Kurt


On Thu, Mar 7, 2019 at 9:23 AM yuess_coder <64...@qq.com> wrote:

> 我在sql-client提交任务:
>
> create table csv_source1(
> id varchar,
> name varchar
> ) with (
> type ='csv',
> path = '/Users/IdeaProjects/github/apache-flink/build-target/bin/test1.csv'
> );
>
>
> create table csv_sink(
> id varchar,
> name varchar
> ) with (
> type ='csv',
> path = '/Users/IdeaProjects/github/apache-flink/build-target/bin/test4.csv'
> );
>
> insert into csv_sink  select t1.name,t1.id from csv_source1 t1
>
>
> 错误是org.apache.flink.table.api.TableEnvironment这个类1300行空指针,用execution
> batch模式不行,用execution streaming模式是可以的。请问下才能batch模式执行这个sql?
>
>
>