You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "张超明 (Jira)" <ji...@apache.org> on 2021/02/08 02:30:00 UTC
[jira] [Updated] (HUDI-1596) Throw NPE while reading configuration
of parameter 'path'.
[ https://issues.apache.org/jira/browse/HUDI-1596?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
张超明 updated HUDI-1596:
----------------------
Summary: Throw NPE while reading configuration of parameter 'path'. (was: NPE while reading configuration of parameter 'path'.)
> Throw NPE while reading configuration of parameter 'path'.
> ----------------------------------------------------------
>
> Key: HUDI-1596
> URL: https://issues.apache.org/jira/browse/HUDI-1596
> Project: Apache Hudi
> Issue Type: Bug
> Components: Flink Integration
> Affects Versions: 0.8.0
> Reporter: 张超明
> Priority: Blocker
>
> h2. *Problem*
> Program throws NPE during initilaize writeClient.
> {code:java}
> public void open(Configuration parameters) throws Exception {
> super.open(parameters);
> // Get configs from runtimeContext
> cfg = (FlinkStreamerConfig) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
> writeParallelSize = getRuntimeContext().getExecutionConfig().getParallelism();
> // writeClient
> writeClient = new HoodieFlinkWriteClient<>(new HoodieFlinkEngineContext(new FlinkTaskContextSupplier(null)), StreamerUtil.getHoodieClientConfig(cfg));
> }
> {code}
> The `cfg` does not have option 'path', which cause NPE. Exception occured on ` .withPath(conf.getString(FlinkOptions.PATH))`.
> {code:java}
> public static HoodieWriteConfig getHoodieClientConfig(Configuration conf) {
> HoodieWriteConfig.Builder builder =
> HoodieWriteConfig.newBuilder()
> .withEngineType(EngineType.FLINK)
> .withPath(conf.getString(FlinkOptions.PATH))
> .combineInput(conf.getBoolean(FlinkOptions.INSERT_DROP_DUPS), true)
> .withCompactionConfig(
> HoodieCompactionConfig.newBuilder()
> .withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS))
> .build())
> .forTable(conf.getString(FlinkOptions.TABLE_NAME))
> .withAutoCommit(false)
> .withProps(flinkConf2TypedProperties(FlinkOptions.flatOptions(conf)));
> builder = builder.withSchema(getSourceSchema(conf).toString());
> return builder.build();
> }{code}
> h2. *Solution*
> Modify `open()` as below:
> {code:java}
> public void open(Configuration parameters) throws Exception {
> super.open(parameters);
> // Get configs from runtimeContext
> cfg = (FlinkStreamerConfig) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
> writeParallelSize = getRuntimeContext().getExecutionConfig().getParallelism();
> // writeClient
> final Configuration conf = FlinkOptions.fromStreamerConfig(cfg); // Changes here
> final HoodieWriteConfig hoodieClientConfig = StreamerUtil.getHoodieClientConfig(conf);
> final HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(new FlinkTaskContextSupplier(null));
> writeClient = new HoodieFlinkWriteClient<>(context, hoodieClientConfig);
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)