You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by ga...@apache.org on 2020/11/19 07:28:03 UTC

[hudi] branch master updated: [MINOR] clean up and add comments to flink client (#2261)

This is an automated email from the ASF dual-hosted git repository.

garyli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c8d5ea2  [MINOR] clean up and add comments to flink client (#2261)
c8d5ea2 is described below

commit c8d5ea2752f8289b9dec37a149e82c030f657924
Author: Gary Li <ya...@gmail.com>
AuthorDate: Thu Nov 19 15:27:52 2020 +0800

    [MINOR] clean up and add comments to flink client (#2261)
---
 .../table/action/commit/AbstractMergeHelper.java   | 18 +++++++--------
 .../apache/hudi/client/HoodieFlinkWriteClient.java |  5 ++++
 .../client/common/HoodieFlinkEngineContext.java    |  8 ++++---
 .../org/apache/hudi/index/FlinkHoodieIndex.java    |  2 +-
 .../hudi/table/HoodieFlinkCopyOnWriteTable.java    |  8 +++++++
 .../org/apache/hudi/table/HoodieFlinkTable.java    |  2 +-
 .../commit/BaseFlinkCommitActionExecutor.java      |  2 +-
 .../table/action/commit/FlinkDeleteHelper.java     |  2 +-
 .../table/action/commit/UpsertPartitioner.java     |  3 +++
 .../rollback/FlinkMarkerBasedRollbackStrategy.java |  3 ++-
 .../rollback/ListingBasedRollbackHelper.java       |  3 +++
 ...FlinkStreamer.java => HoodieFlinkStreamer.java} | 23 +++++++++---------
 .../hudi/operator/InstantGenerateOperator.java     | 27 +++++++++++-----------
 .../hudi/operator/KeyedWriteProcessFunction.java   |  6 ++---
 .../hudi/operator/KeyedWriteProcessOperator.java   |  5 ++--
 .../hudi/schema/FilebasedSchemaProvider.java       |  7 +++---
 .../org/apache/hudi/schema/SchemaProvider.java     |  3 ++-
 .../main/java/org/apache/hudi/sink/CommitSink.java | 15 ++++++------
 .../JsonStringToHoodieRecordMapFunction.java       | 11 +++++----
 .../java/org/apache/hudi/util/AvroConvertor.java   |  3 ++-
 .../java/org/apache/hudi/util/StreamerUtil.java    | 17 +++++++-------
 21 files changed, 102 insertions(+), 71 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/AbstractMergeHelper.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/AbstractMergeHelper.java
index 8c92b00..e318fe3 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/AbstractMergeHelper.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/AbstractMergeHelper.java
@@ -18,14 +18,6 @@
 
 package org.apache.hudi.table.action.commit;
 
-import java.io.ByteArrayOutputStream;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericDatumWriter;
-import org.apache.avro.io.BinaryDecoder;
-import org.apache.avro.io.BinaryEncoder;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.io.EncoderFactory;
-import org.apache.hadoop.fs.Path;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.client.utils.MergingIterator;
 import org.apache.hudi.common.model.HoodieBaseFile;
@@ -37,15 +29,23 @@ import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.table.HoodieTable;
 
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.Iterator;
 
 /**
- * Helper to read records from previous version of parquet and run Merge.
+ * Helper to read records from previous version of base file and run Merge.
  */
 public abstract class AbstractMergeHelper<T extends HoodieRecordPayload, I, K, O> {
 
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 4e08003..f975406 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -97,6 +97,11 @@ public class HoodieFlinkWriteClient<T extends HoodieRecordPayload> extends
   }
 
   @Override
+  public void bootstrap(Option<Map<String, String>> extraMetadata) {
+    throw new HoodieNotSupportedException("Bootstrap operation is not supported yet");
+  }
+
+  @Override
   public List<WriteStatus> upsert(List<HoodieRecord<T>> records, String instantTime) {
     HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table =
         getTableAndInitCtx(WriteOperationType.UPSERT, instantTime);
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
index 52052fd..74c921f 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/common/HoodieFlinkEngineContext.java
@@ -18,21 +18,23 @@
 
 package org.apache.hudi.client.common;
 
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hudi.client.FlinkTaskContextSupplier;
 import org.apache.hudi.client.common.function.SerializableConsumer;
 import org.apache.hudi.client.common.function.SerializableFunction;
 import org.apache.hudi.client.common.function.SerializablePairFunction;
 import org.apache.hudi.common.config.SerializableConfiguration;
 import org.apache.hudi.common.util.Option;
-import scala.Tuple2;
+
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.hadoop.conf.Configuration;
 
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import scala.Tuple2;
+
 import static org.apache.hudi.client.common.function.FunctionWrapper.throwingFlatMapWrapper;
 import static org.apache.hudi.client.common.function.FunctionWrapper.throwingForeachWrapper;
 import static org.apache.hudi.client.common.function.FunctionWrapper.throwingMapToPairWrapper;
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndex.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndex.java
index 83b76fc..427212c 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndex.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/index/FlinkHoodieIndex.java
@@ -19,7 +19,6 @@
 package org.apache.hudi.index;
 
 import org.apache.hudi.ApiMaturityLevel;
-import org.apache.hudi.PublicAPIMethod;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieEngineContext;
 import org.apache.hudi.client.common.HoodieFlinkEngineContext;
@@ -31,6 +30,7 @@ import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIndexException;
 import org.apache.hudi.index.state.FlinkInMemoryStateIndex;
+import org.apache.hudi.PublicAPIMethod;
 import org.apache.hudi.table.HoodieTable;
 
 import java.util.List;
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
index 6d4c570..acb010c 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkCopyOnWriteTable.java
@@ -46,6 +46,14 @@ import org.apache.hudi.table.action.rollback.FlinkCopyOnWriteRollbackActionExecu
 import java.util.List;
 import java.util.Map;
 
+/**
+ * Implementation of a very heavily read-optimized Hoodie Table where, all data is stored in base files, with
+ * zero read amplification.
+ * <p>
+ * INSERTS - Produce new files, block aligned to desired size (or) Merge with the smallest existing file, to expand it
+ * <p>
+ * UPDATES - Produce a new version of the file, just replacing the updated records with new values
+ */
 public class HoodieFlinkCopyOnWriteTable<T extends HoodieRecordPayload> extends HoodieFlinkTable<T> {
 
   protected HoodieFlinkCopyOnWriteTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) {
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
index 79fb376..3c09b38 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
@@ -35,7 +35,7 @@ import org.apache.hudi.index.HoodieIndex;
 
 import java.util.List;
 
-public  abstract class HoodieFlinkTable<T extends HoodieRecordPayload>
+public abstract class HoodieFlinkTable<T extends HoodieRecordPayload>
     extends HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
   protected HoodieFlinkTable(HoodieWriteConfig config, HoodieEngineContext context, HoodieTableMetaClient metaClient) {
     super(config, context, metaClient);
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
index 0b98b11..1d40b8e 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.table.action.commit;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieBaseFile;
@@ -47,6 +46,7 @@ import org.apache.hudi.table.WorkloadProfile;
 import org.apache.hudi.table.WorkloadStat;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 
+import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeleteHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeleteHelper.java
index bc1293a..57a87c4 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeleteHelper.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/FlinkDeleteHelper.java
@@ -94,7 +94,7 @@ public class FlinkDeleteHelper<R> extends
       List<HoodieRecord<EmptyHoodieRecordPayload>> dedupedRecords =
           dedupedKeys.stream().map(key -> new HoodieRecord<>(key, new EmptyHoodieRecordPayload())).collect(Collectors.toList());
       Instant beginTag = Instant.now();
-      // perform index loop up to get existing location of records
+      // perform index look up to get existing location of records
       List<HoodieRecord<EmptyHoodieRecordPayload>> taggedRecords =
           table.getIndex().tagLocation(dedupedRecords, context, table);
       Duration tagLocationDuration = Duration.between(beginTag, Instant.now());
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
index 4bfaab4..2bcd3b2 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
@@ -49,6 +49,9 @@ import java.util.stream.Collectors;
 
 import scala.Tuple2;
 
+/**
+ * Packs incoming records to be upserted, into buckets.
+ */
 public class UpsertPartitioner<T extends HoodieRecordPayload<T>> implements Partitioner  {
 
   private static final Logger LOG = LogManager.getLogger(UpsertPartitioner.class);
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMarkerBasedRollbackStrategy.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMarkerBasedRollbackStrategy.java
index 1e666b1..8cf91a2 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMarkerBasedRollbackStrategy.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/FlinkMarkerBasedRollbackStrategy.java
@@ -30,11 +30,12 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieRollbackException;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.MarkerFiles;
-import scala.Tuple2;
 
 import java.util.List;
 import java.util.stream.Collectors;
 
+import scala.Tuple2;
+
 @SuppressWarnings("checkstyle:LineLength")
 public class FlinkMarkerBasedRollbackStrategy<T extends HoodieRecordPayload> extends AbstractMarkerBasedRollbackStrategy<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> {
   public FlinkMarkerBasedRollbackStrategy(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table, HoodieEngineContext context, HoodieWriteConfig config, String instantTime) {
diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
index 2b619fb..612635d 100644
--- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
+++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackHelper.java
@@ -50,6 +50,9 @@ import java.util.stream.Collectors;
 
 import scala.Tuple2;
 
+/**
+ * Performs Rollback of Hoodie Tables.
+ */
 public class ListingBasedRollbackHelper implements Serializable {
 
   private static final Logger LOG = LogManager.getLogger(ListingBasedRollbackHelper.class);
diff --git a/hudi-flink/src/main/java/org/apache/hudi/HudiFlinkStreamer.java b/hudi-flink/src/main/java/org/apache/hudi/HoodieFlinkStreamer.java
similarity index 99%
rename from hudi-flink/src/main/java/org/apache/hudi/HudiFlinkStreamer.java
rename to hudi-flink/src/main/java/org/apache/hudi/HoodieFlinkStreamer.java
index 1206cbe..0c9991d 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/HudiFlinkStreamer.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/HoodieFlinkStreamer.java
@@ -18,6 +18,17 @@
 
 package org.apache.hudi;
 
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.operator.InstantGenerateOperator;
+import org.apache.hudi.operator.KeyedWriteProcessFunction;
+import org.apache.hudi.operator.KeyedWriteProcessOperator;
+import org.apache.hudi.sink.CommitSink;
+import org.apache.hudi.source.JsonStringToHoodieRecordMapFunction;
+import org.apache.hudi.util.StreamerUtil;
+
 import com.beust.jcommander.IStringConverter;
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
@@ -31,16 +42,6 @@ import org.apache.flink.runtime.state.filesystem.FsStateBackend;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.model.HoodieRecord;
-import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
-import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.operator.InstantGenerateOperator;
-import org.apache.hudi.operator.KeyedWriteProcessFunction;
-import org.apache.hudi.operator.KeyedWriteProcessOperator;
-import org.apache.hudi.sink.CommitSink;
-import org.apache.hudi.source.JsonStringToHoodieRecordMapFunction;
-import org.apache.hudi.util.StreamerUtil;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -51,7 +52,7 @@ import java.util.Properties;
  * An Utility which can incrementally consume data from Kafka and apply it to the target table.
  * currently, it only support COW table and insert, upsert operation.
  */
-public class HudiFlinkStreamer {
+public class HoodieFlinkStreamer {
   public static void main(String[] args) throws Exception {
     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java b/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
index b242276..165eeb0 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/operator/InstantGenerateOperator.java
@@ -18,17 +18,7 @@
 
 package org.apache.hudi.operator;
 
-import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.ListStateDescriptor;
-import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.runtime.state.StateSnapshotContext;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hudi.HudiFlinkStreamer;
+import org.apache.hudi.HoodieFlinkStreamer;
 import org.apache.hudi.client.FlinkTaskContextSupplier;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.common.HoodieFlinkEngineContext;
@@ -42,6 +32,17 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.util.StreamerUtil;
 
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -63,7 +64,7 @@ public class InstantGenerateOperator extends AbstractStreamOperator<HoodieRecord
   private static final Logger LOG = LoggerFactory.getLogger(InstantGenerateOperator.class);
   public static final String NAME = "InstantGenerateOperator";
 
-  private HudiFlinkStreamer.Config cfg;
+  private HoodieFlinkStreamer.Config cfg;
   private HoodieFlinkWriteClient writeClient;
   private SerializableConfiguration serializableHadoopConf;
   private transient FileSystem fs;
@@ -87,7 +88,7 @@ public class InstantGenerateOperator extends AbstractStreamOperator<HoodieRecord
   public void open() throws Exception {
     super.open();
     // get configs from runtimeContext
-    cfg = (HudiFlinkStreamer.Config) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
+    cfg = (HoodieFlinkStreamer.Config) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
 
     // retry times
     retryTimes = Integer.valueOf(cfg.blockRetryTime);
diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/KeyedWriteProcessFunction.java b/hudi-flink/src/main/java/org/apache/hudi/operator/KeyedWriteProcessFunction.java
index 9a751e8..d3ebddf 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/operator/KeyedWriteProcessFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/operator/KeyedWriteProcessFunction.java
@@ -18,7 +18,7 @@
 
 package org.apache.hudi.operator;
 
-import org.apache.hudi.HudiFlinkStreamer;
+import org.apache.hudi.HoodieFlinkStreamer;
 import org.apache.hudi.client.FlinkTaskContextSupplier;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.WriteStatus;
@@ -77,7 +77,7 @@ public class KeyedWriteProcessFunction extends KeyedProcessFunction<String, Hood
   /**
    * Job conf.
    */
-  private HudiFlinkStreamer.Config cfg;
+  private HoodieFlinkStreamer.Config cfg;
 
   /**
    * Write Client.
@@ -90,7 +90,7 @@ public class KeyedWriteProcessFunction extends KeyedProcessFunction<String, Hood
 
     indexOfThisSubtask = getRuntimeContext().getIndexOfThisSubtask();
 
-    cfg = (HudiFlinkStreamer.Config) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
+    cfg = (HoodieFlinkStreamer.Config) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
 
     HoodieFlinkEngineContext context =
         new HoodieFlinkEngineContext(new SerializableConfiguration(new org.apache.hadoop.conf.Configuration()), new FlinkTaskContextSupplier(getRuntimeContext()));
diff --git a/hudi-flink/src/main/java/org/apache/hudi/operator/KeyedWriteProcessOperator.java b/hudi-flink/src/main/java/org/apache/hudi/operator/KeyedWriteProcessOperator.java
index beefe26..1d8e3c5 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/operator/KeyedWriteProcessOperator.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/operator/KeyedWriteProcessOperator.java
@@ -18,13 +18,14 @@
 
 package org.apache.hudi.operator;
 
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieRecord;
+
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
 import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.hudi.client.WriteStatus;
-import org.apache.hudi.common.model.HoodieRecord;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/hudi-flink/src/main/java/org/apache/hudi/schema/FilebasedSchemaProvider.java b/hudi-flink/src/main/java/org/apache/hudi/schema/FilebasedSchemaProvider.java
index f174631..82699d9 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/schema/FilebasedSchemaProvider.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/schema/FilebasedSchemaProvider.java
@@ -18,14 +18,15 @@
 
 package org.apache.hudi.schema;
 
-import org.apache.avro.Schema;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.util.StreamerUtil;
 
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
 import java.io.IOException;
 import java.util.Collections;
 
diff --git a/hudi-flink/src/main/java/org/apache/hudi/schema/SchemaProvider.java b/hudi-flink/src/main/java/org/apache/hudi/schema/SchemaProvider.java
index f429a4c..74b4067 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/schema/SchemaProvider.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/schema/SchemaProvider.java
@@ -18,9 +18,10 @@
 
 package org.apache.hudi.schema;
 
-import org.apache.avro.Schema;
 import org.apache.hudi.common.config.TypedProperties;
 
+import org.apache.avro.Schema;
+
 import java.io.Serializable;
 
 /**
diff --git a/hudi-flink/src/main/java/org/apache/hudi/sink/CommitSink.java b/hudi-flink/src/main/java/org/apache/hudi/sink/CommitSink.java
index 0660211..4ca7930 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/sink/CommitSink.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/sink/CommitSink.java
@@ -18,10 +18,6 @@
 
 package org.apache.hudi.sink;
 
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-import org.apache.hudi.HudiFlinkStreamer;
 import org.apache.hudi.client.FlinkTaskContextSupplier;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.client.WriteStatus;
@@ -29,8 +25,13 @@ import org.apache.hudi.client.common.HoodieFlinkEngineContext;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieFlinkStreamerException;
+import org.apache.hudi.HoodieFlinkStreamer;
 import org.apache.hudi.util.StreamerUtil;
 
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,7 +51,7 @@ public class CommitSink extends RichSinkFunction<Tuple3<String, List<WriteStatus
   /**
    * Job conf.
    */
-  private HudiFlinkStreamer.Config cfg;
+  private HoodieFlinkStreamer.Config cfg;
 
   /**
    * Write client.
@@ -71,7 +72,7 @@ public class CommitSink extends RichSinkFunction<Tuple3<String, List<WriteStatus
   public void open(Configuration parameters) throws Exception {
     super.open(parameters);
     // Get configs from runtimeContext
-    cfg = (HudiFlinkStreamer.Config) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
+    cfg = (HoodieFlinkStreamer.Config) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
 
     writeParallelSize = getRuntimeContext().getExecutionConfig().getParallelism();
 
@@ -81,7 +82,7 @@ public class CommitSink extends RichSinkFunction<Tuple3<String, List<WriteStatus
 
   @Override
   public void invoke(Tuple3<String, List<WriteStatus>, Integer> writeStatues, Context context) {
-    LOG.info("Receive records, instantTime = [{}], subtaskId = [{}], records size = [{}]", writeStatues.f0, writeStatues.f2, writeStatues.f1.size());
+    LOG.info("Receive records, instantTime = [{}], subtaskId = [{}], WriteStatus size = [{}]", writeStatues.f0, writeStatues.f2, writeStatues.f1.size());
     try {
       if (bufferedWriteStatus.containsKey(writeStatues.f0)) {
         bufferedWriteStatus.get(writeStatues.f0).add(writeStatues.f1);
diff --git a/hudi-flink/src/main/java/org/apache/hudi/source/JsonStringToHoodieRecordMapFunction.java b/hudi-flink/src/main/java/org/apache/hudi/source/JsonStringToHoodieRecordMapFunction.java
index 01d5caf..f878484 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/source/JsonStringToHoodieRecordMapFunction.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/source/JsonStringToHoodieRecordMapFunction.java
@@ -18,9 +18,7 @@
 
 package org.apache.hudi.source;
 
-import org.apache.avro.generic.GenericRecord;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.hudi.HudiFlinkStreamer;
+import org.apache.hudi.HoodieFlinkStreamer;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieRecord;
@@ -30,6 +28,9 @@ import org.apache.hudi.schema.FilebasedSchemaProvider;
 import org.apache.hudi.util.AvroConvertor;
 import org.apache.hudi.util.StreamerUtil;
 
+import org.apache.avro.generic.GenericRecord;
+import org.apache.flink.api.common.functions.MapFunction;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -42,12 +43,12 @@ public class JsonStringToHoodieRecordMapFunction implements MapFunction<String,
 
   private static Logger LOG = LoggerFactory.getLogger(JsonStringToHoodieRecordMapFunction.class);
 
-  private final HudiFlinkStreamer.Config cfg;
+  private final HoodieFlinkStreamer.Config cfg;
   private TypedProperties props;
   private KeyGenerator keyGenerator;
   private AvroConvertor avroConvertor;
 
-  public JsonStringToHoodieRecordMapFunction(HudiFlinkStreamer.Config cfg) {
+  public JsonStringToHoodieRecordMapFunction(HoodieFlinkStreamer.Config cfg) {
     this.cfg = cfg;
     init();
   }
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/AvroConvertor.java b/hudi-flink/src/main/java/org/apache/hudi/util/AvroConvertor.java
index cf2071e..40ce9f1 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/AvroConvertor.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/AvroConvertor.java
@@ -18,10 +18,11 @@
 
 package org.apache.hudi.util;
 
+import org.apache.hudi.avro.MercifulJsonConverter;
+
 import com.twitter.bijection.Injection;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.hudi.avro.MercifulJsonConverter;
 
 import java.io.Serializable;
 
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index c5f6269..db7ad25 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -18,11 +18,7 @@
 
 package org.apache.hudi.util;
 
-import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hudi.HudiFlinkStreamer;
+import org.apache.hudi.HoodieFlinkStreamer;
 import org.apache.hudi.common.config.DFSPropertiesConfiguration;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.fs.FSUtils;
@@ -37,6 +33,11 @@ import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.keygen.KeyGenerator;
 import org.apache.hudi.keygen.SimpleAvroKeyGenerator;
 import org.apache.hudi.schema.FilebasedSchemaProvider;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -51,14 +52,14 @@ public class StreamerUtil {
 
   private static Logger LOG = LoggerFactory.getLogger(StreamerUtil.class);
 
-  public static Properties getKafkaProps(HudiFlinkStreamer.Config cfg) {
+  public static Properties getKafkaProps(HoodieFlinkStreamer.Config cfg) {
     Properties result = new Properties();
     result.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, cfg.kafkaBootstrapServers);
     result.put(ConsumerConfig.GROUP_ID_CONFIG, cfg.kafkaGroupId);
     return result;
   }
 
-  public static TypedProperties getProps(HudiFlinkStreamer.Config cfg) {
+  public static TypedProperties getProps(HoodieFlinkStreamer.Config cfg) {
     return readConfig(
         FSUtils.getFs(cfg.propsFilePath, getHadoopConf()),
         new Path(cfg.propsFilePath), cfg.configs).getConfig();
@@ -130,7 +131,7 @@ public class StreamerUtil {
     }
   }
 
-  public static HoodieWriteConfig getHoodieClientConfig(HudiFlinkStreamer.Config cfg) {
+  public static HoodieWriteConfig getHoodieClientConfig(HoodieFlinkStreamer.Config cfg) {
     FileSystem fs = FSUtils.getFs(cfg.targetBasePath, getHadoopConf());
     HoodieWriteConfig.Builder builder =
         HoodieWriteConfig.newBuilder().withPath(cfg.targetBasePath).combineInput(cfg.filterDupes, true)