You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2021/02/08 08:17:52 UTC

[GitHub] [hudi] wangxianghu commented on a change in pull request #2547: [HUDI-1596] NPE while getting option PATH from configuration.

wangxianghu commented on a change in pull request #2547:
URL: https://github.com/apache/hudi/pull/2547#discussion_r571849816



##########
File path: hudi-flink/src/main/java/org/apache/hudi/sink/CommitSink.java
##########
@@ -77,7 +79,11 @@ public void open(Configuration parameters) throws Exception {
     writeParallelSize = getRuntimeContext().getExecutionConfig().getParallelism();
 
     // writeClient
-    writeClient = new HoodieFlinkWriteClient<>(new HoodieFlinkEngineContext(new FlinkTaskContextSupplier(null)), StreamerUtil.getHoodieClientConfig(cfg));

Review comment:
       > You may see my change files. It obvious that the original version did not invokes `FlinkOptions.fromStreamerConfig(conf)` firstly.
   
   @ZhangChaoming Did you update your local code to the same as master? It seems ok in my side
   ```
     public static HoodieWriteConfig getHoodieClientConfig(FlinkStreamerConfig conf) {
       return getHoodieClientConfig(FlinkOptions.fromStreamerConfig(conf));
     }
   
     public static HoodieWriteConfig getHoodieClientConfig(Configuration conf) {
       HoodieWriteConfig.Builder builder =
           HoodieWriteConfig.newBuilder()
               .withEngineType(EngineType.FLINK)
               .withPath(conf.getString(FlinkOptions.PATH))
               .combineInput(conf.getBoolean(FlinkOptions.INSERT_DROP_DUPS), true)
               .withCompactionConfig(
                   HoodieCompactionConfig.newBuilder()
                       .withPayloadClass(conf.getString(FlinkOptions.PAYLOAD_CLASS))
                       .build())
               .forTable(conf.getString(FlinkOptions.TABLE_NAME))
               .withAutoCommit(false)
               .withProps(flinkConf2TypedProperties(FlinkOptions.flatOptions(conf)));
   
       builder = builder.withSchema(getSourceSchema(conf).toString());
       return builder.build();
     }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org