You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "张超明 (Jira)" <ji...@apache.org> on 2021/02/08 02:25:00 UTC
[jira] [Created] (HUDI-1596) NPE while reading configuration of
parameter 'path'.
张超明 created HUDI-1596:
-------------------------
Summary: NPE while reading configuration of parameter 'path'.
Key: HUDI-1596
URL: https://issues.apache.org/jira/browse/HUDI-1596
Project: Apache Hudi
Issue Type: Bug
Components: Flink Integration
Affects Versions: 0.8.0
Reporter: 张超明
h2. *Problem*
Program throws NPE during initilaize writeClient.
{code:java}
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// Get configs from runtimeContext
cfg = (FlinkStreamerConfig) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
writeParallelSize = getRuntimeContext().getExecutionConfig().getParallelism();
// writeClient
writeClient = new HoodieFlinkWriteClient<>(new HoodieFlinkEngineContext(new FlinkTaskContextSupplier(null)), StreamerUtil.getHoodieClientConfig(cfg));
}
{code}
The `cfg` does not have option 'path', which cause NPE. Exception occured on ` .withPath(conf.getString(FlinkOptions.PATH))`.
{code:java}
public static HoodieWriteConfig getHoodieClientConfig(Configuration conf) {
HoodieWriteConfig.Builder builder =
HoodieWriteConfig.newBuilder()
.withEngineType(EngineType.FLINK)
.withPath(conf.getString(FlinkOptions.PATH))
.combineInput(conf.getBoolean(FlinkOptions.INSERT_DROP_DUPS), true)
.withCompactionConfig(
HoodieCompactionConfig.newBuilder()
.withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS))
.build())
.forTable(conf.getString(FlinkOptions.TABLE_NAME))
.withAutoCommit(false)
.withProps(flinkConf2TypedProperties(FlinkOptions.flatOptions(conf)));
builder = builder.withSchema(getSourceSchema(conf).toString());
return builder.build();
}{code}
h2. *Solution*
Modify `open()` as below:
{code:java}
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// Get configs from runtimeContext
cfg = (FlinkStreamerConfig) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
writeParallelSize = getRuntimeContext().getExecutionConfig().getParallelism();
// writeClient
final Configuration conf = FlinkOptions.fromStreamerConfig(cfg);
final HoodieWriteConfig hoodieClientConfig = StreamerUtil.getHoodieClientConfig(conf);
final HoodieFlinkEngineContext context = new HoodieFlinkEngineContext(new FlinkTaskContextSupplier(null));
writeClient = new HoodieFlinkWriteClient<>(context, hoodieClientConfig);
}
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)