You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "张超明 (Jira)" <ji...@apache.org> on 2021/02/08 02:31:00 UTC

[jira] [Updated] (HUDI-1596) Throw NPE while reading configuration of parameter 'PATH'.

     [ https://issues.apache.org/jira/browse/HUDI-1596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

张超明 updated HUDI-1596:
----------------------
    Summary: Throw NPE while reading configuration of parameter 'PATH'.  (was: Throw NPE while reading configuration of parameter 'path'.)

> Throw NPE while reading configuration of parameter 'PATH'.
> ----------------------------------------------------------
>
>                 Key: HUDI-1596
>                 URL: https://issues.apache.org/jira/browse/HUDI-1596
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: Flink Integration
>    Affects Versions: 0.8.0
>            Reporter: 张超明
>            Priority: Blocker
>
> h2. *Problem*
> Program throws NPE during initilaize writeClient.
> {code:java}
> public void open(Configuration parameters) throws Exception {
>   super.open(parameters);
>   // Get configs from runtimeContext
>   cfg = (FlinkStreamerConfig) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
>   writeParallelSize = getRuntimeContext().getExecutionConfig().getParallelism();
>   // writeClient
>   writeClient = new HoodieFlinkWriteClient<>(new HoodieFlinkEngineContext(new FlinkTaskContextSupplier(null)), StreamerUtil.getHoodieClientConfig(cfg));
> }
> {code}
> *The `cfg` does not have option 'PATH', which cause NPE. Exception occured on* ` .withPath(conf.getString(FlinkOptions.PATH))`.
> {code:java}
> public static HoodieWriteConfig getHoodieClientConfig(Configuration conf) {
>   HoodieWriteConfig.Builder builder =
>       HoodieWriteConfig.newBuilder()
>           .withEngineType(EngineType.FLINK)
>           .withPath(conf.getString(FlinkOptions.PATH))
>           .combineInput(conf.getBoolean(FlinkOptions.INSERT_DROP_DUPS), true)
>           .withCompactionConfig(
>               HoodieCompactionConfig.newBuilder()
>                   .withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS))
>                   .build())
>           .forTable(conf.getString(FlinkOptions.TABLE_NAME))
>           .withAutoCommit(false)
>           .withProps(flinkConf2TypedProperties(FlinkOptions.flatOptions(conf)));
>   builder = builder.withSchema(getSourceSchema(conf).toString());
>   return builder.build();
> }{code}
> h2. *Solution*
> Modify `open()` as below:
> {code:java}
> public void open(Configuration parameters) throws Exception {
>   super.open(parameters);
>   // Get configs from runtimeContext
>   cfg = (FlinkStreamerConfig) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
>   writeParallelSize = getRuntimeContext().getExecutionConfig().getParallelism();
>   // writeClient 
>   final Configuration conf = FlinkOptions.fromStreamerConfig(cfg); // Changes here
>   final HoodieWriteConfig hoodieClientConfig = StreamerUtil.getHoodieClientConfig(conf);
>   final HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(new FlinkTaskContextSupplier(null));
>   writeClient = new HoodieFlinkWriteClient<>(context, hoodieClientConfig);
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)