You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "张超明 (Jira)" <ji...@apache.org> on 2021/02/08 02:25:00 UTC

[jira] [Created] (HUDI-1596) NPE while reading configuration of parameter 'path'.

张超明 created HUDI-1596:
-------------------------

             Summary: NPE while reading configuration of parameter 'path'.
                 Key: HUDI-1596
                 URL: https://issues.apache.org/jira/browse/HUDI-1596
             Project: Apache Hudi
          Issue Type: Bug
          Components: Flink Integration
    Affects Versions: 0.8.0
            Reporter: 张超明


h2. *Problem*

Program throws NPE during initilaize writeClient.
{code:java}
public void open(Configuration parameters) throws Exception {
  super.open(parameters);
  // Get configs from runtimeContext
  cfg = (FlinkStreamerConfig) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();

  writeParallelSize = getRuntimeContext().getExecutionConfig().getParallelism();

  // writeClient
  writeClient = new HoodieFlinkWriteClient<>(new HoodieFlinkEngineContext(new FlinkTaskContextSupplier(null)), StreamerUtil.getHoodieClientConfig(cfg));
}
{code}
The `cfg` does not have option 'path', which cause NPE. Exception occured on ` .withPath(conf.getString(FlinkOptions.PATH))`.
{code:java}
public static HoodieWriteConfig getHoodieClientConfig(Configuration conf) {
  HoodieWriteConfig.Builder builder =
      HoodieWriteConfig.newBuilder()
          .withEngineType(EngineType.FLINK)
          .withPath(conf.getString(FlinkOptions.PATH))
          .combineInput(conf.getBoolean(FlinkOptions.INSERT_DROP_DUPS), true)
          .withCompactionConfig(
              HoodieCompactionConfig.newBuilder()
                  .withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS))
                  .build())
          .forTable(conf.getString(FlinkOptions.TABLE_NAME))
          .withAutoCommit(false)
          .withProps(flinkConf2TypedProperties(FlinkOptions.flatOptions(conf)));

  builder = builder.withSchema(getSourceSchema(conf).toString());
  return builder.build();
}{code}
h2. *Solution*

Modify `open()` as below:
{code:java}
public void open(Configuration parameters) throws Exception {
  super.open(parameters);
  // Get configs from runtimeContext
  cfg = (FlinkStreamerConfig) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();

  writeParallelSize = getRuntimeContext().getExecutionConfig().getParallelism();

  // writeClient 
  final Configuration conf = FlinkOptions.fromStreamerConfig(cfg);
  final HoodieWriteConfig hoodieClientConfig = StreamerUtil.getHoodieClientConfig(conf);
  final HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(new FlinkTaskContextSupplier(null));

  writeClient = new HoodieFlinkWriteClient<>(context, hoodieClientConfig);
}
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)