You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/10/15 16:06:10 UTC

[GitHub] [hudi] wangxianghu opened a new pull request #2181: [HUDI-911] Add Blog about Hudi-Spark decoupling and Flink integration…

wangxianghu opened a new pull request #2181:
URL: https://github.com/apache/hudi/pull/2181


   …design
   
   ## *Tips*
   - *Thank you very much for contributing to Apache Hudi.*
   - *Please review https://hudi.apache.org/contributing.html before opening a pull request.*
   
   ## What is the purpose of the pull request
   
   *Add Blog about Hudi-Spark decoupling and Flink integration desigin*
   
   ## Brief change log
   
   *(for example:)*
     - *Modify AnnotationLocation checkstyle rule in checkstyle.xml*
   
   ## Verify this pull request
   
   *(Please pick either of the following options)*
   
   This pull request is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This pull request is already covered by existing tests, such as *(please describe tests)*.
   
   (or)
   
   This change added tests and can be verified as follows:
   
   *(example:)*
   
     - *Added integration tests for end-to-end.*
     - *Added HoodieClientWriteTest to verify the change.*
     - *Manually verified the change by running a job locally.*
   
   ## Committer checklist
   
    - [ ] Has a corresponding JIRA in PR title & commit
    
    - [ ] Commit message is descriptive of the change
    
    - [ ] CI is green
   
    - [ ] Necessary doc changes done or have another open PR
          
    - [ ] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] wangxianghu commented on a change in pull request #2181: [HUDI-911] Add Blog about Hudi-Spark decoupling and Flink integration…

Posted by GitBox <gi...@apache.org>.
wangxianghu commented on a change in pull request #2181:
URL: https://github.com/apache/hudi/pull/2181#discussion_r509822436



##########
File path: docs/_posts/2020-10-15-apache-hudi-meets-apache-flink.md
##########
@@ -0,0 +1,196 @@
+---
+title: "Apache Hudi meets Apache Flink"
+excerpt: "The design and latest progress of the integration of Apache Hudi and Apache Flink."
+author: wangxianghu
+category: blog
+---
+
+Apache Hudi (Hudi for short) is a data lake framework created at Uber. Hudi joined the Apache incubator for incubation in January 2019, and was promoted to the top Apache project in May 2020. It is one of the most popular data lake frameworks.
+
+## 1. Why decouple
+
+Hudi has been using Spark as its data processing engine since its birth. If users want to use Hudi as their data lake framework, they must introduce Spark into their platform technology stack. 
+A few years ago, using Spark as a big data processing engine can be said to be very common or even natural. Since Spark can either perform batch processing or use micro-batch to simulate streaming, one engine solves both streaming and batch problems. 
+However, in recent years, with the development of big data technology, Flink, which is also a big data processing engine, has gradually entered people's vision and has occupied a certain market in the field of computing engines. 
+In the big data technology community, forums and other territories, the voice of whether Hudi supports Flink has gradually appeared and has become more frequent. Therefore, it is a valuable thing to make Hudi support the Flink engine, and the first step of integrating the Flink engine is that Hudi and Spark are decoupled.
+
+In addition, looking at the mature, active, and viable frameworks in the big data, all frameworks are elegant in design and can be integrated with other frameworks and leverage each other's expertise. 
+Therefore, decoupling Hudi from Spark and turning it into an engine-independent data lake framework will undoubtedly create more possibilities for the integration of Hudi and other components, allowing Hudi to better integrate into the big data ecosystem.
+
+## 2. Challenges
+
+Hudi's internal use of Spark API is as common as our usual development and use of List. Since the data source reads the data, and finally writes the data to the table, Spark RDD is used as the main data structure everywhere, and even ordinary tools are implemented using the Spark API. 
+It can be said that Hudi is a universal data lake framework implemented by Spark. Hudi also leverages deep Spark functionality like custom partitioning, in-memory caching to implement indexing and file sizing using workload heuristics. 
+For some of these, Flink offers better out-of-box support (e.g using Flink’s state store for indexing) and can in fact, make Hudi approach real-time latencies more and more. 
+
+In addition, the primary engine integrated after this decoupling is Flink. Flink and Spark differ greatly in core abstraction. Spark believes that data is bounded, and its core abstraction is a limited set of data. 
+Flink believes that the essence of data is a stream, and its core abstract DataStream contains various operations on data. Hudi has a streaming first design (record level updates, record level streams), that arguably fit the Flink model more naturally. 
+At the same time, there are multiple RDDs operating at the same time in Hudi, and the processing result of one RDD is combined with another RDD. 
+This difference in abstraction and the reuse of intermediate results during implementation make it difficult for Hudi to use a unified API to operate both RDD and DataStream in terms of decoupling abstraction.
+
+## 3. Decoupling Spark
+In theory, Hudi uses Spark as its computing engine to use Spark's distributed computing power and RDD's rich operator capabilities. Apart from distributed computing power, Hudi uses RDD more as a data structure, and RDD is essentially a bounded data set. 
+Therefore, it is theoretically feasible to replace RDD with List (of course, it may sacrifice performance/scale). In order to ensure the performance and stability of the Hudi Spark version as much as possible. We can keep the setting the bounded data set as the basic operation unit. 
+Hudi's main operation API remains unchanged, and RDD is extracted as a generic type. The Spark engine implementation still uses RDD, and other engines use List or other bounded  data set according to the actual situation.
+
+### Decoupling principle
+1) Unified generics. The input records `JavaRDD<HoodieRecord>`, key of input records `JavaRDD<HoodieKey>`, and result of write operations `JavaRDD<WriteStatus>` used by the Spark API use generic `I,K,O` instead;
+
+2) De-sparkization. All APIs of the abstraction layer must have nothing to do with Spark. Involving specific operations that are difficult to implement in the abstract layer, rewrite them as abstract methods and introduce Spark subclasses.
+
+For example: Hudi uses the `JavaSparkContext#map()` method in many places. To de-spark, you need to hide the `JavaSparkContext`. For this problem, we introduced the `HoodieEngineContext#map()` method, which will block the specific implementation details of `map`, so as to achieve de-sparkization in abstraction.
+
+3) Minimize changes to the abstraction layer to ensure the original function and performance of Hudi;
+
+4) Replace the `JavaSparkContext` with the `HoodieEngineContext` abstract class to provide the running environment context.
+
+In addition, some of the core algorithms in Hudi, like [rollbacks](https://github.com/apache/hudi/pull/1756), has been redone without the need for computing a workload profile ahead of time, which used to rely on Spark caching. 
+
+## 4. Flink integration design
+Hudi's write operation is batch processing in nature, and the continuous mode of `DeltaStreamer` is realized by looping batch processing. In order to use a unified API, when Hudi integrates Flink, we choose to collect a batch of data before processing, and finally submit it in a unified manner (here we use List to collect data in Flink).
+In Hudi terminology, we will stream data for a given commit, but only publish the commits every so often, making it practical to scale storage on cloud storage and also tunable.
+
+The easiest way to think of batch operation is to use a time window. However, when using a window, when there is no data flowing in a window, there will be no output data, and it is difficult for the Flink sink to judge whether all the data from a given batch has been processed. 
+Therefore, we use Flink's checkpoint mechanism to collect batches. The data between every two barriers is a batch. When there is no data in a subtask, the mock result data is made up. 
+In this way, on the sink side, when each subtask has result data issued, it can be considered that a batch of data has been processed and the commit can be executed.
+
+The DAG is as follows:
+
+![dualism](/assets/images/blog/hudi-meets-flink/image1.png)
+
+ - **Source:** receives Kafka data and converts it into `List<HoodieRecord>`;
+ - **InstantGeneratorOperator:** generates a globally unique instant. When the previous instant is not completed or the current batch has no data, no new instant is created;
+ - **KeyBy partitionPath:** partitions according to `partitionPath` to avoid multiple subtasks from writing the same partition;
+ - **WriteProcessOperator:** performs a write operation. When there is no data in the current partition, it sends empty result data to the downstream to make up the number;
+ - **CommitSink:** receives the calculation results of the upstream task. When receiving the parallelism results, it is considered that all the upstream subtasks are completed and the commit is executed.
+
+Note:
+`InstantGeneratorOperator` and `WriteProcessOperator` are both custom Flink operators. `InstantGeneratorOperator` will block checking the state of the previous instant to ensure that there is only one instant in the global (or requested) state.
+`WriteProcessOperator` is the actual execution Where a write operation is performed, the write operation is triggered at checkpoint.
+
+### 4.1 Index design based on Flink State
+
+Stateful computing is one of the highlights of the Flink engine. Compared with using external storage, using Flink's built-in `State` can significantly improve the performance of Flink applications. 
+Therefore, it would be a good choice to implement a Hudi index based on Flink's State.
+
+The core of the Hudi index is to maintain the mapping of the Hudi key `HoodieKey` and the location of the Hudi data `HoodieRecordLocation`. 
+Therefore, based on the current design, we can simply maintain a `MapState<HoodieKey, HoodieRecordLocation>` in Flink UDF to map the `HoodieKey` and `HoodieRecordLocation`, and leave the fault tolerance and persistence of State to the Flink framework.
+
+![dualism](/assets/images/blog/hudi-meets-flink/image2.png)
+
+## 5. Implementation examples
+### 1) HoodieTable
+
+```
+/**
+  * Abstract implementation of a HoodieTable.
+  *
+  * @param <T> Sub type of HoodieRecordPayload
+  * @param <I> Type of inputs
+  * @param <K> Type of keys
+  * @param <O> Type of outputs
+  */
+public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implements Serializable {
+
+   protected final HoodieWriteConfig config;
+   protected final HoodieTableMetaClient metaClient;
+   protected final HoodieIndex<T, I, K, O> index;
+
+   public abstract HoodieWriteMetadata<O> upsert(HoodieEngineContext context, String instantTime,
+       I records);
+
+   public abstract HoodieWriteMetadata<O> insert(HoodieEngineContext context, String instantTime,
+       I records);
+
+   public abstract HoodieWriteMetadata<O> bulkInsert(HoodieEngineContext context, String instantTime,
+       I records, Option<BulkInsertPartitioner<I>> bulkInsertPartitioner);
+
+   ...
+}
+```
+
+`HoodieTable` is one of the core abstractions of Hudi, which defines operations such as `insert`, `upsert`, and `bulkInsert` supported by the table. 
+Take `upsert` as an example, the input data is changed from the original `JavaRDD<HoodieRecord> inputRdds` to `I records`, and the runtime `JavaSparkContext jsc` is changed to `HoodieEngineContext context`.
+
+From the class annotations, we can see that `T, I, K, O` represent the load data type, input data type, primary key type and output data type of Hudi operation respectively. 
+These generics will run through the entire abstraction layer.
+
+### 2) HoodieEngineContext
+
+```
+/**
+ * Base class contains the context information needed by the engine at runtime. It will be extended by different
+ * engine implementation if needed.
+ */
+public abstract class HoodieEngineContext {
+
+  public abstract <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism);
+
+  public abstract <I, O> List<O> flatMap(List<I> data, SerializableFunction<I, Stream<O>> func, int parallelism);
+
+  public abstract <I> void foreach(List<I> data, SerializableConsumer<I> consumer, int parallelism);
+
+  ......
+}
+```
+
+`HoodieEngineContext` plays the role of `JavaSparkContext`, it not only provides all the information that `JavaSparkContext` can provide, 
+but also encapsulates many methods such as `map`, `flatMap`, `foreach`, and hides The specific implementation of `JavaSparkContext#map()`,`JavaSparkContext#flatMap()`, `JavaSparkContext#foreach()` and other methods.
+
+Take the `map` method as an example. In the Spark implementation class `HoodieSparkEngineContext`, the `map` method is as follows:
+
+```
+  @Override
+  public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism) {
+    return javaSparkContext.parallelize(data, parallelism).map(func::apply).collect();
+  }
+```
+
+In the engine that operates List, the implementation can be as follows (different methods need to pay attention to thread safety issues, use `parallel()` with caution):
+
+```
+  @Override
+  public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism) {
+    return data.stream().parallel().map(func::apply).collect(Collectors.toList());
+  }
+```
+
+Note:
+The exception thrown in the map function can be solved by wrapping `SerializableFunction<I, O> func`.
+
+Here is a brief introduction to `SerializableFunction`:
+
+```
+@FunctionalInterface
+public interface SerializableFunction<I, O> extends Serializable {
+  O apply(I v1) throws Exception;
+}
+```
+
+This method is actually a variant of `java.util.function.Function`. The difference from `java.util.function.Function` is that `SerializableFunction` can be serialized and can throw exceptions. 
+This function is introduced because the input parameters that the `JavaSparkContext#map()` function can receive must be serializable. 
+At the same time, there are many exceptions that need to be thrown in the logic of Hudi, and the code for `try catch` in the Lambda expression will be omitted It is bloated and not very elegant.
+
+## 6. Current progress and follow-up plan
+
+### 6.1 Working time axis
+
+![dualism](/assets/images/blog/hudi-meets-flink/image3.png)
+
+[T3go](https://www.t3go.cn/)
+[Aliyun](https://cn.aliyun.com/)
+[SF-express](https://www.sf-express.com/cn/sc/)
+
+### 6.2 Follow-up plan
+
+#### 1) Promote the integration of Hudi and Flink
+
+Push the integration of Flink and Hudi to the community as soon as possible. In the initial stage, this feature may only support kafka data sources.

Review comment:
       > `kafka` -> `Kafka`?
   
   done, thanks for your detailed review




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] vinothchandar commented on a change in pull request #2181: [HUDI-911] Add Blog about Hudi-Spark decoupling and Flink integration…

Posted by GitBox <gi...@apache.org>.
vinothchandar commented on a change in pull request #2181:
URL: https://github.com/apache/hudi/pull/2181#discussion_r508732124



##########
File path: docs/_posts/2020-10-15-apache-hudi-meets-apache-flink.md
##########
@@ -0,0 +1,196 @@
+---
+title: "Apache Hudi meets Apache Flink"
+excerpt: "The design and latest progress of the integration of Apache Hudi and Apache Flink."
+author: wangxianghu
+category: blog
+---
+
+Apache Hudi is a data lake framework developed and open sourced by Uber. Hudi joined the Apache incubator for incubation in January 2019, and was promoted to the top Apache project in May 2020. It is one of the most popular data lake frameworks.

Review comment:
       "originally developed" or "created at Uber" ? Hudi is very much an Apache project. 

##########
File path: docs/_posts/2020-10-15-apache-hudi-meets-apache-flink.md
##########
@@ -0,0 +1,196 @@
+---
+title: "Apache Hudi meets Apache Flink"
+excerpt: "The design and latest progress of the integration of Apache Hudi and Apache Flink."
+author: wangxianghu
+category: blog
+---
+
+Apache Hudi is a data lake framework developed and open sourced by Uber. Hudi joined the Apache incubator for incubation in January 2019, and was promoted to the top Apache project in May 2020. It is one of the most popular data lake frameworks.
+
+## 1. Why decouple
+
+Hudi has been using Spark as its data processing engine since its birth. If users want to use Hudi as their data lake framework, they must introduce Spark into their platform technology stack. 
+A few years ago, using Spark as a big data processing engine can be said to be very common or even natural. Since Spark can either perform batch processing or use micro-batch to simulate streaming, one engine solves both streaming and batch problems. 
+However, in recent years, with the development of big data technology, Flink, which is also a big data processing engine, has gradually entered people's vision and has occupied a certain market in the field of computing engines. 
+In the big data technology community, forums and other territories, the voice of whether Hudi supports Flink has gradually appeared and has become more frequent. Therefore, it is a valuable thing to make Hudi support the Flink engine, and the first step of integrating the Flink engine is that Hudi and Spark are decoupled.
+
+In addition, looking at the mature, active, and viable frameworks in the big data, all frameworks are elegant in design and can be integrated with other frameworks and leverage each other's expertise. 
+Therefore, decoupling Hudi from Spark and turning it into an engine-independent data lake framework will undoubtedly create more possibilities for the integration of Hudi and other components, allowing Hudi to better integrate into the big data ecosystem.
+
+## 2. Challenges
+
+Hudi's internal use of Spark API is as common as our usual development and use of List. Since the data source reads the data, and finally writes the data to the table, Spark RDD is used as the main data structure everywhere, and even ordinary tools are implemented using the Spark API. 
+It can be said that Hudi is a universal data lake framework implemented by Spark. Hudi also leverages deep Spark functionality like custom partitioning, in-memory caching to implement indexing and file sizing using workload heuristics. 
+For some of these, Flink offers better out-of-box support (e.g using Flink’s state store for indexing) and can in fact, make Hudi approach real-time latencies more and more. 
+
+In addition, the primary engine integrated after this decoupling is Flink. Flink and Spark differ greatly in core abstraction. Spark believes that data is bounded, and its core abstraction is a limited set of data. 
+Flink believes that the essence of data is a stream, and its core abstract DataStream contains various operations on data. Hudi has a streaming first design (record level updates, record level streams), that arguably fit the Flink model more naturally. 
+At the same time, there are multiple RDDs operating at the same time in Hudi, and the processing result of one RDD is combined with another RDD. 
+This difference in abstraction and the reuse of intermediate results during implementation make it difficult for Hudi to use a unified API to operate both RDD and DataStream in terms of decoupling abstraction.
+
+## 3. Decoupling Spark
+In theory, Hudi uses Spark as its computing engine to use Spark's distributed computing power and RDD's rich operator capabilities. Apart from distributed computing power, Hudi uses RDD more as a data structure, and RDD is essentially a bounded data set. 
+Therefore, it is theoretically feasible to replace RDD with List (of course, it may sacrifice performance/scale). In order to ensure the performance and stability of the Hudi Spark version as much as possible. We can keep the setting the bounded data set as the basic operation unit. 
+Hudi's main operation API remains unchanged, and RDD is extracted as a generic type. The Spark engine implementation still uses RDD, and other engines use List or other bounded  data set according to the actual situation.
+
+### Decoupling principle
+1) Unified generics. The input records `JavaRDD<HoodieRecord>`, key of input records `JavaRDD<HoodieKey>`, and result of write operations `JavaRDD<WriteStatus>` used by the Spark API use generic `I,K,O` instead;
+
+2) De-sparkization. All APIs of the abstraction layer must have nothing to do with Spark. Involving specific operations that are difficult to implement in the abstract layer, rewrite them as abstract methods and introduce Spark subclasses.
+
+For example: Hudi uses the `JavaSparkContext#map()` method in many places. To de-spark, you need to hide the `JavaSparkContext`. For this problem, we introduced the `HoodieEngineContext#map()` method, which will block the specific implementation details of `map`, so as to achieve de-sparkization in abstraction.
+
+3) Minimize changes to the abstraction layer to ensure the original function and performance of Hudi;
+
+4) Replace the `JavaSparkContext` with the `HoodieEngineContext` abstract class to provide the running environment context.
+
+In addition, some of the core algorithms in Hudi, like rollbacks, has been redone without the need for computing a workload profile ahead of time, which used to rely on Spark caching. 

Review comment:
       may be link to marker based rollback PR?

##########
File path: docs/_posts/2020-10-15-apache-hudi-meets-apache-flink.md
##########
@@ -0,0 +1,196 @@
+---
+title: "Apache Hudi meets Apache Flink"
+excerpt: "The design and latest progress of the integration of Apache Hudi and Apache Flink."
+author: wangxianghu
+category: blog
+---
+
+Apache Hudi is a data lake framework developed and open sourced by Uber. Hudi joined the Apache incubator for incubation in January 2019, and was promoted to the top Apache project in May 2020. It is one of the most popular data lake frameworks.

Review comment:
       also just denote that Hudi means `Apache Hudi`. Can we say "Apache Hudi (hudi for short) is a data lake ..." 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] wangxianghu commented on pull request #2181: [HUDI-911] Add Blog about Hudi-Spark decoupling and Flink integration…

Posted by GitBox <gi...@apache.org>.
wangxianghu commented on pull request #2181:
URL: https://github.com/apache/hudi/pull/2181#issuecomment-709442394


   @vinothchandar @yanghua @leesf please take a look when free


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] wangxianghu commented on a change in pull request #2181: [HUDI-911] Add Blog about Hudi-Spark decoupling and Flink integration…

Posted by GitBox <gi...@apache.org>.
wangxianghu commented on a change in pull request #2181:
URL: https://github.com/apache/hudi/pull/2181#discussion_r508472082



##########
File path: docs/_posts/2020-10-15-apache-hudi-meets-apache-flink.md
##########
@@ -0,0 +1,196 @@
+---
+title: "Apache Hudi meets Apache Flink"
+excerpt: "The design and latest progress of the integration of Apache hudi and Apache Flink."
+author: wangxianghu
+category: blog
+---
+
+Apache Hudi is a data lake framework developed and open sourced by Uber. Hudi joined the Apache incubator for incubation in January 2019, and was promoted to the top Apache project in May 2020. It is one of the most popular data lake frameworks.
+
+## 1. Why decouple
+
+Hudi has been using Spark as its data processing engine since its birth. If users want to use Hudi as their data lake framework, they must introduce Spark into their platform technology stack. 
+A few years ago, using Spark as a big data processing engine can be said to be very common or even natural. Since Spark can either perform batch processing or use micro-batch to simulate streaming, one engine solves both streaming and batch problems. 
+However, in recent years, with the development of big data technology, Flink, which is also a big data processing engine, has gradually entered people's vision and has occupied a certain market in the field of computing engines. 
+In the big data technology community, forums and other territories, the voice of whether Hudi supports flink has gradually appeared and has become more frequent. Therefore, it is a valuable thing to make hudi support the flink engine, and the first step of integrating the Flink engine is that hudi and spark are decoupled.
+
+In addition, looking at the mature, active, and viable frameworks in the big data, all frameworks are elegant in design and can be integrated with other frameworks and leverage each other's expertise. 
+Therefore, decoupling Hudi from Spark and turning it into an engine-independent data lake framework will undoubtedly create more possibilities for the integration of Hudi and other components, allowing Hudi to better integrate into the big data ecosystem.
+
+## 2. Challenges
+
+Hudi's internal use of Spark API is as common as our usual development and use of List. Since the data source reads the data, and finally writes the data to the table, Spark RDD is used as the main data structure everywhere, and even ordinary tools are implemented using the Spark API. 
+It can be said that Hudi is a universal data lake framework implemented by Spark. Hudi also leverage deep Spark functionality like custom partitioning, in-memory caching to implement indexing and file sizing using workload heuristics. 
+For some of these, Flink offers better out-of-box support (e.g using Flink’s state store for indexing) and can in fact, make Hudi approach real-time latencies more and more. 
+
+In addition, the primary engine integrated after this decoupling is Flink. Flink and Spark differ greatly in core abstraction. Spark believes that data is bounded, and its core abstraction is a limited set of data. 
+Flink believes that the essence of data is stream, and its core abstract DataStream contains various operations on data. Hudi, has a streaming first design (record level updates, record level streams), that arguably fit the Flink model more naturally. 

Review comment:
       > `Hudi,` -> `Hudi`
   
   @yanghua Thanks, I have addressed all your concerns, please PTAL when free




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yanghua commented on a change in pull request #2181: [HUDI-911] Add Blog about Hudi-Spark decoupling and Flink integration…

Posted by GitBox <gi...@apache.org>.
yanghua commented on a change in pull request #2181:
URL: https://github.com/apache/hudi/pull/2181#discussion_r509818607



##########
File path: docs/_posts/2020-10-15-apache-hudi-meets-apache-flink.md
##########
@@ -0,0 +1,196 @@
+---
+title: "Apache Hudi meets Apache Flink"
+excerpt: "The design and latest progress of the integration of Apache Hudi and Apache Flink."
+author: wangxianghu
+category: blog
+---
+
+Apache Hudi (Hudi for short) is a data lake framework created at Uber. Hudi joined the Apache incubator for incubation in January 2019, and was promoted to the top Apache project in May 2020. It is one of the most popular data lake frameworks.
+
+## 1. Why decouple
+
+Hudi has been using Spark as its data processing engine since its birth. If users want to use Hudi as their data lake framework, they must introduce Spark into their platform technology stack. 
+A few years ago, using Spark as a big data processing engine can be said to be very common or even natural. Since Spark can either perform batch processing or use micro-batch to simulate streaming, one engine solves both streaming and batch problems. 
+However, in recent years, with the development of big data technology, Flink, which is also a big data processing engine, has gradually entered people's vision and has occupied a certain market in the field of computing engines. 
+In the big data technology community, forums and other territories, the voice of whether Hudi supports Flink has gradually appeared and has become more frequent. Therefore, it is a valuable thing to make Hudi support the Flink engine, and the first step of integrating the Flink engine is that Hudi and Spark are decoupled.
+
+In addition, looking at the mature, active, and viable frameworks in the big data, all frameworks are elegant in design and can be integrated with other frameworks and leverage each other's expertise. 
+Therefore, decoupling Hudi from Spark and turning it into an engine-independent data lake framework will undoubtedly create more possibilities for the integration of Hudi and other components, allowing Hudi to better integrate into the big data ecosystem.
+
+## 2. Challenges
+
+Hudi's internal use of Spark API is as common as our usual development and use of List. Since the data source reads the data, and finally writes the data to the table, Spark RDD is used as the main data structure everywhere, and even ordinary tools are implemented using the Spark API. 
+It can be said that Hudi is a universal data lake framework implemented by Spark. Hudi also leverages deep Spark functionality like custom partitioning, in-memory caching to implement indexing and file sizing using workload heuristics. 
+For some of these, Flink offers better out-of-box support (e.g using Flink’s state store for indexing) and can in fact, make Hudi approach real-time latencies more and more. 
+
+In addition, the primary engine integrated after this decoupling is Flink. Flink and Spark differ greatly in core abstraction. Spark believes that data is bounded, and its core abstraction is a limited set of data. 
+Flink believes that the essence of data is a stream, and its core abstract DataStream contains various operations on data. Hudi has a streaming first design (record level updates, record level streams), that arguably fit the Flink model more naturally. 
+At the same time, there are multiple RDDs operating at the same time in Hudi, and the processing result of one RDD is combined with another RDD. 
+This difference in abstraction and the reuse of intermediate results during implementation make it difficult for Hudi to use a unified API to operate both RDD and DataStream in terms of decoupling abstraction.
+
+## 3. Decoupling Spark
+In theory, Hudi uses Spark as its computing engine to use Spark's distributed computing power and RDD's rich operator capabilities. Apart from distributed computing power, Hudi uses RDD more as a data structure, and RDD is essentially a bounded data set. 
+Therefore, it is theoretically feasible to replace RDD with List (of course, it may sacrifice performance/scale). In order to ensure the performance and stability of the Hudi Spark version as much as possible. We can keep the setting the bounded data set as the basic operation unit. 

Review comment:
       `the setting` -> `setting`

##########
File path: docs/_posts/2020-10-15-apache-hudi-meets-apache-flink.md
##########
@@ -0,0 +1,196 @@
+---
+title: "Apache Hudi meets Apache Flink"
+excerpt: "The design and latest progress of the integration of Apache Hudi and Apache Flink."
+author: wangxianghu
+category: blog
+---
+
+Apache Hudi (Hudi for short) is a data lake framework created at Uber. Hudi joined the Apache incubator for incubation in January 2019, and was promoted to the top Apache project in May 2020. It is one of the most popular data lake frameworks.
+
+## 1. Why decouple
+
+Hudi has been using Spark as its data processing engine since its birth. If users want to use Hudi as their data lake framework, they must introduce Spark into their platform technology stack. 
+A few years ago, using Spark as a big data processing engine can be said to be very common or even natural. Since Spark can either perform batch processing or use micro-batch to simulate streaming, one engine solves both streaming and batch problems. 
+However, in recent years, with the development of big data technology, Flink, which is also a big data processing engine, has gradually entered people's vision and has occupied a certain market in the field of computing engines. 
+In the big data technology community, forums and other territories, the voice of whether Hudi supports Flink has gradually appeared and has become more frequent. Therefore, it is a valuable thing to make Hudi support the Flink engine, and the first step of integrating the Flink engine is that Hudi and Spark are decoupled.
+
+In addition, looking at the mature, active, and viable frameworks in the big data, all frameworks are elegant in design and can be integrated with other frameworks and leverage each other's expertise. 
+Therefore, decoupling Hudi from Spark and turning it into an engine-independent data lake framework will undoubtedly create more possibilities for the integration of Hudi and other components, allowing Hudi to better integrate into the big data ecosystem.
+
+## 2. Challenges
+
+Hudi's internal use of Spark API is as common as our usual development and use of List. Since the data source reads the data, and finally writes the data to the table, Spark RDD is used as the main data structure everywhere, and even ordinary tools are implemented using the Spark API. 
+It can be said that Hudi is a universal data lake framework implemented by Spark. Hudi also leverages deep Spark functionality like custom partitioning, in-memory caching to implement indexing and file sizing using workload heuristics. 
+For some of these, Flink offers better out-of-box support (e.g using Flink’s state store for indexing) and can in fact, make Hudi approach real-time latencies more and more. 
+
+In addition, the primary engine integrated after this decoupling is Flink. Flink and Spark differ greatly in core abstraction. Spark believes that data is bounded, and its core abstraction is a limited set of data. 
+Flink believes that the essence of data is a stream, and its core abstract DataStream contains various operations on data. Hudi has a streaming first design (record level updates, record level streams), that arguably fit the Flink model more naturally. 
+At the same time, there are multiple RDDs operating at the same time in Hudi, and the processing result of one RDD is combined with another RDD. 
+This difference in abstraction and the reuse of intermediate results during implementation make it difficult for Hudi to use a unified API to operate both RDD and DataStream in terms of decoupling abstraction.
+
+## 3. Decoupling Spark
+In theory, Hudi uses Spark as its computing engine to use Spark's distributed computing power and RDD's rich operator capabilities. Apart from distributed computing power, Hudi uses RDD more as a data structure, and RDD is essentially a bounded data set. 
+Therefore, it is theoretically feasible to replace RDD with List (of course, it may sacrifice performance/scale). In order to ensure the performance and stability of the Hudi Spark version as much as possible. We can keep the setting the bounded data set as the basic operation unit. 
+Hudi's main operation API remains unchanged, and RDD is extracted as a generic type. The Spark engine implementation still uses RDD, and other engines use List or other bounded  data set according to the actual situation.
+
+### Decoupling principle
+1) Unified generics. The input records `JavaRDD<HoodieRecord>`, key of input records `JavaRDD<HoodieKey>`, and result of write operations `JavaRDD<WriteStatus>` used by the Spark API use generic `I,K,O` instead;
+
+2) De-sparkization. All APIs of the abstraction layer must have nothing to do with Spark. Involving specific operations that are difficult to implement in the abstract layer, rewrite them as abstract methods and introduce Spark subclasses.
+
+For example: Hudi uses the `JavaSparkContext#map()` method in many places. To de-spark, you need to hide the `JavaSparkContext`. For this problem, we introduced the `HoodieEngineContext#map()` method, which will block the specific implementation details of `map`, so as to achieve de-sparkization in abstraction.
+
+3) Minimize changes to the abstraction layer to ensure the original function and performance of Hudi;
+
+4) Replace the `JavaSparkContext` with the `HoodieEngineContext` abstract class to provide the running environment context.
+
+In addition, some of the core algorithms in Hudi, like [rollbacks](https://github.com/apache/hudi/pull/1756), has been redone without the need for computing a workload profile ahead of time, which used to rely on Spark caching. 
+
+## 4. Flink integration design
+Hudi's write operation is batch processing in nature, and the continuous mode of `DeltaStreamer` is realized by looping batch processing. In order to use a unified API, when Hudi integrates Flink, we choose to collect a batch of data before processing, and finally submit it in a unified manner (here we use List to collect data in Flink).
+In Hudi terminology, we will stream data for a given commit, but only publish the commits every so often, making it practical to scale storage on cloud storage and also tunable.
+
+The easiest way to think of batch operation is to use a time window. However, when using a window, when there is no data flowing in a window, there will be no output data, and it is difficult for the Flink sink to judge whether all the data from a given batch has been processed. 
+Therefore, we use Flink's checkpoint mechanism to collect batches. The data between every two barriers is a batch. When there is no data in a subtask, the mock result data is made up. 
+In this way, on the sink side, when each subtask has result data issued, it can be considered that a batch of data has been processed and the commit can be executed.
+
+The DAG is as follows:
+
+![dualism](/assets/images/blog/hudi-meets-flink/image1.png)
+
+ - **Source:** receives Kafka data and converts it into `List<HoodieRecord>`;
+ - **InstantGeneratorOperator:** generates a globally unique instant. When the previous instant is not completed or the current batch has no data, no new instant is created;
+ - **KeyBy partitionPath:** partitions according to `partitionPath` to avoid multiple subtasks from writing the same partition;
+ - **WriteProcessOperator:** performs a write operation. When there is no data in the current partition, it sends empty result data to the downstream to make up the number;
+ - **CommitSink:** receives the calculation results of the upstream task. When receiving the parallelism results, it is considered that all the upstream subtasks are completed and the commit is executed.
+
+Note:
+`InstantGeneratorOperator` and `WriteProcessOperator` are both custom Flink operators. `InstantGeneratorOperator` will block checking the state of the previous instant to ensure that there is only one instant in the global (or requested) state.
+`WriteProcessOperator` is the actual execution Where a write operation is performed, the write operation is triggered at checkpoint.
+
+### 4.1 Index design based on Flink State
+
+Stateful computing is one of the highlights of the Flink engine. Compared with using external storage, using Flink's built-in `State` can significantly improve the performance of Flink applications. 
+Therefore, it would be a good choice to implement a Hudi index based on Flink's State.
+
+The core of the Hudi index is to maintain the mapping of the Hudi key `HoodieKey` and the location of the Hudi data `HoodieRecordLocation`. 
+Therefore, based on the current design, we can simply maintain a `MapState<HoodieKey, HoodieRecordLocation>` in Flink UDF to map the `HoodieKey` and `HoodieRecordLocation`, and leave the fault tolerance and persistence of State to the Flink framework.
+
+![dualism](/assets/images/blog/hudi-meets-flink/image2.png)
+
+## 5. Implementation examples
+### 1) HoodieTable
+
+```
+/**
+  * Abstract implementation of a HoodieTable.
+  *
+  * @param <T> Sub type of HoodieRecordPayload
+  * @param <I> Type of inputs
+  * @param <K> Type of keys
+  * @param <O> Type of outputs
+  */
+public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implements Serializable {
+
+   protected final HoodieWriteConfig config;
+   protected final HoodieTableMetaClient metaClient;
+   protected final HoodieIndex<T, I, K, O> index;
+
+   public abstract HoodieWriteMetadata<O> upsert(HoodieEngineContext context, String instantTime,
+       I records);
+
+   public abstract HoodieWriteMetadata<O> insert(HoodieEngineContext context, String instantTime,
+       I records);
+
+   public abstract HoodieWriteMetadata<O> bulkInsert(HoodieEngineContext context, String instantTime,
+       I records, Option<BulkInsertPartitioner<I>> bulkInsertPartitioner);
+
+   ...
+}
+```
+
+`HoodieTable` is one of the core abstractions of Hudi, which defines operations such as `insert`, `upsert`, and `bulkInsert` supported by the table. 
+Take `upsert` as an example, the input data is changed from the original `JavaRDD<HoodieRecord> inputRdds` to `I records`, and the runtime `JavaSparkContext jsc` is changed to `HoodieEngineContext context`.
+
+From the class annotations, we can see that `T, I, K, O` represent the load data type, input data type, primary key type and output data type of Hudi operation respectively. 

Review comment:
       `represent ` -> `represents`

##########
File path: docs/_posts/2020-10-15-apache-hudi-meets-apache-flink.md
##########
@@ -0,0 +1,196 @@
+---
+title: "Apache Hudi meets Apache Flink"
+excerpt: "The design and latest progress of the integration of Apache Hudi and Apache Flink."
+author: wangxianghu
+category: blog
+---
+
+Apache Hudi (Hudi for short) is a data lake framework created at Uber. Hudi joined the Apache incubator for incubation in January 2019, and was promoted to the top Apache project in May 2020. It is one of the most popular data lake frameworks.
+
+## 1. Why decouple
+
+Hudi has been using Spark as its data processing engine since its birth. If users want to use Hudi as their data lake framework, they must introduce Spark into their platform technology stack. 
+A few years ago, using Spark as a big data processing engine can be said to be very common or even natural. Since Spark can either perform batch processing or use micro-batch to simulate streaming, one engine solves both streaming and batch problems. 
+However, in recent years, with the development of big data technology, Flink, which is also a big data processing engine, has gradually entered people's vision and has occupied a certain market in the field of computing engines. 
+In the big data technology community, forums and other territories, the voice of whether Hudi supports Flink has gradually appeared and has become more frequent. Therefore, it is a valuable thing to make Hudi support the Flink engine, and the first step of integrating the Flink engine is that Hudi and Spark are decoupled.
+
+In addition, looking at the mature, active, and viable frameworks in the big data, all frameworks are elegant in design and can be integrated with other frameworks and leverage each other's expertise. 
+Therefore, decoupling Hudi from Spark and turning it into an engine-independent data lake framework will undoubtedly create more possibilities for the integration of Hudi and other components, allowing Hudi to better integrate into the big data ecosystem.
+
+## 2. Challenges
+
+Hudi's internal use of Spark API is as common as our usual development and use of List. Since the data source reads the data, and finally writes the data to the table, Spark RDD is used as the main data structure everywhere, and even ordinary tools are implemented using the Spark API. 
+It can be said that Hudi is a universal data lake framework implemented by Spark. Hudi also leverages deep Spark functionality like custom partitioning, in-memory caching to implement indexing and file sizing using workload heuristics. 
+For some of these, Flink offers better out-of-box support (e.g using Flink’s state store for indexing) and can in fact, make Hudi approach real-time latencies more and more. 
+
+In addition, the primary engine integrated after this decoupling is Flink. Flink and Spark differ greatly in core abstraction. Spark believes that data is bounded, and its core abstraction is a limited set of data. 
+Flink believes that the essence of data is a stream, and its core abstract DataStream contains various operations on data. Hudi has a streaming first design (record level updates, record level streams), that arguably fit the Flink model more naturally. 
+At the same time, there are multiple RDDs operating at the same time in Hudi, and the processing result of one RDD is combined with another RDD. 
+This difference in abstraction and the reuse of intermediate results during implementation make it difficult for Hudi to use a unified API to operate both RDD and DataStream in terms of decoupling abstraction.
+
+## 3. Decoupling Spark
+In theory, Hudi uses Spark as its computing engine to use Spark's distributed computing power and RDD's rich operator capabilities. Apart from distributed computing power, Hudi uses RDD more as a data structure, and RDD is essentially a bounded data set. 
+Therefore, it is theoretically feasible to replace RDD with List (of course, it may sacrifice performance/scale). In order to ensure the performance and stability of the Hudi Spark version as much as possible. We can keep the setting the bounded data set as the basic operation unit. 
+Hudi's main operation API remains unchanged, and RDD is extracted as a generic type. The Spark engine implementation still uses RDD, and other engines use List or other bounded  data set according to the actual situation.
+
+### Decoupling principle
+1) Unified generics. The input records `JavaRDD<HoodieRecord>`, key of input records `JavaRDD<HoodieKey>`, and result of write operations `JavaRDD<WriteStatus>` used by the Spark API use generic `I,K,O` instead;
+
+2) De-sparkization. All APIs of the abstraction layer must have nothing to do with Spark. Involving specific operations that are difficult to implement in the abstract layer, rewrite them as abstract methods and introduce Spark subclasses.
+
+For example: Hudi uses the `JavaSparkContext#map()` method in many places. To de-spark, you need to hide the `JavaSparkContext`. For this problem, we introduced the `HoodieEngineContext#map()` method, which will block the specific implementation details of `map`, so as to achieve de-sparkization in abstraction.
+
+3) Minimize changes to the abstraction layer to ensure the original function and performance of Hudi;
+
+4) Replace the `JavaSparkContext` with the `HoodieEngineContext` abstract class to provide the running environment context.
+
+In addition, some of the core algorithms in Hudi, like [rollbacks](https://github.com/apache/hudi/pull/1756), has been redone without the need for computing a workload profile ahead of time, which used to rely on Spark caching. 

Review comment:
       `rollbacks` -> `rollback`, you used `has` not `have`

##########
File path: docs/_posts/2020-10-15-apache-hudi-meets-apache-flink.md
##########
@@ -0,0 +1,196 @@
+---
+title: "Apache Hudi meets Apache Flink"
+excerpt: "The design and latest progress of the integration of Apache Hudi and Apache Flink."
+author: wangxianghu
+category: blog
+---
+
+Apache Hudi (Hudi for short) is a data lake framework created at Uber. Hudi joined the Apache incubator for incubation in January 2019, and was promoted to the top Apache project in May 2020. It is one of the most popular data lake frameworks.
+
+## 1. Why decouple
+
+Hudi has been using Spark as its data processing engine since its birth. If users want to use Hudi as their data lake framework, they must introduce Spark into their platform technology stack. 
+A few years ago, using Spark as a big data processing engine can be said to be very common or even natural. Since Spark can either perform batch processing or use micro-batch to simulate streaming, one engine solves both streaming and batch problems. 
+However, in recent years, with the development of big data technology, Flink, which is also a big data processing engine, has gradually entered people's vision and has occupied a certain market in the field of computing engines. 
+In the big data technology community, forums and other territories, the voice of whether Hudi supports Flink has gradually appeared and has become more frequent. Therefore, it is a valuable thing to make Hudi support the Flink engine, and the first step of integrating the Flink engine is that Hudi and Spark are decoupled.
+
+In addition, looking at the mature, active, and viable frameworks in the big data, all frameworks are elegant in design and can be integrated with other frameworks and leverage each other's expertise. 
+Therefore, decoupling Hudi from Spark and turning it into an engine-independent data lake framework will undoubtedly create more possibilities for the integration of Hudi and other components, allowing Hudi to better integrate into the big data ecosystem.
+
+## 2. Challenges
+
+Hudi's internal use of Spark API is as common as our usual development and use of List. Since the data source reads the data, and finally writes the data to the table, Spark RDD is used as the main data structure everywhere, and even ordinary tools are implemented using the Spark API. 
+It can be said that Hudi is a universal data lake framework implemented by Spark. Hudi also leverages deep Spark functionality like custom partitioning, in-memory caching to implement indexing and file sizing using workload heuristics. 
+For some of these, Flink offers better out-of-box support (e.g using Flink’s state store for indexing) and can in fact, make Hudi approach real-time latencies more and more. 
+
+In addition, the primary engine integrated after this decoupling is Flink. Flink and Spark differ greatly in core abstraction. Spark believes that data is bounded, and its core abstraction is a limited set of data. 
+Flink believes that the essence of data is a stream, and its core abstract DataStream contains various operations on data. Hudi has a streaming first design (record level updates, record level streams), that arguably fit the Flink model more naturally. 
+At the same time, there are multiple RDDs operating at the same time in Hudi, and the processing result of one RDD is combined with another RDD. 
+This difference in abstraction and the reuse of intermediate results during implementation make it difficult for Hudi to use a unified API to operate both RDD and DataStream in terms of decoupling abstraction.
+
+## 3. Decoupling Spark
+In theory, Hudi uses Spark as its computing engine to use Spark's distributed computing power and RDD's rich operator capabilities. Apart from distributed computing power, Hudi uses RDD more as a data structure, and RDD is essentially a bounded data set. 
+Therefore, it is theoretically feasible to replace RDD with List (of course, it may sacrifice performance/scale). In order to ensure the performance and stability of the Hudi Spark version as much as possible. We can keep the setting the bounded data set as the basic operation unit. 
+Hudi's main operation API remains unchanged, and RDD is extracted as a generic type. The Spark engine implementation still uses RDD, and other engines use List or other bounded  data set according to the actual situation.
+
+### Decoupling principle
+1) Unified generics. The input records `JavaRDD<HoodieRecord>`, key of input records `JavaRDD<HoodieKey>`, and result of write operations `JavaRDD<WriteStatus>` used by the Spark API use generic `I,K,O` instead;
+
+2) De-sparkization. All APIs of the abstraction layer must have nothing to do with Spark. Involving specific operations that are difficult to implement in the abstract layer, rewrite them as abstract methods and introduce Spark subclasses.
+
+For example: Hudi uses the `JavaSparkContext#map()` method in many places. To de-spark, you need to hide the `JavaSparkContext`. For this problem, we introduced the `HoodieEngineContext#map()` method, which will block the specific implementation details of `map`, so as to achieve de-sparkization in abstraction.
+
+3) Minimize changes to the abstraction layer to ensure the original function and performance of Hudi;
+
+4) Replace the `JavaSparkContext` with the `HoodieEngineContext` abstract class to provide the running environment context.
+
+In addition, some of the core algorithms in Hudi, like [rollbacks](https://github.com/apache/hudi/pull/1756), has been redone without the need for computing a workload profile ahead of time, which used to rely on Spark caching. 
+
+## 4. Flink integration design
+Hudi's write operation is batch processing in nature, and the continuous mode of `DeltaStreamer` is realized by looping batch processing. In order to use a unified API, when Hudi integrates Flink, we choose to collect a batch of data before processing, and finally submit it in a unified manner (here we use List to collect data in Flink).
+In Hudi terminology, we will stream data for a given commit, but only publish the commits every so often, making it practical to scale storage on cloud storage and also tunable.
+
+The easiest way to think of batch operation is to use a time window. However, when using a window, when there is no data flowing in a window, there will be no output data, and it is difficult for the Flink sink to judge whether all the data from a given batch has been processed. 
+Therefore, we use Flink's checkpoint mechanism to collect batches. The data between every two barriers is a batch. When there is no data in a subtask, the mock result data is made up. 
+In this way, on the sink side, when each subtask has result data issued, it can be considered that a batch of data has been processed and the commit can be executed.
+
+The DAG is as follows:
+
+![dualism](/assets/images/blog/hudi-meets-flink/image1.png)
+
+ - **Source:** receives Kafka data and converts it into `List<HoodieRecord>`;
+ - **InstantGeneratorOperator:** generates a globally unique instant. When the previous instant is not completed or the current batch has no data, no new instant is created;
+ - **KeyBy partitionPath:** partitions according to `partitionPath` to avoid multiple subtasks from writing the same partition;
+ - **WriteProcessOperator:** performs a write operation. When there is no data in the current partition, it sends empty result data to the downstream to make up the number;
+ - **CommitSink:** receives the calculation results of the upstream task. When receiving the parallelism results, it is considered that all the upstream subtasks are completed and the commit is executed.
+
+Note:
+`InstantGeneratorOperator` and `WriteProcessOperator` are both custom Flink operators. `InstantGeneratorOperator` will block checking the state of the previous instant to ensure that there is only one instant in the global (or requested) state.
+`WriteProcessOperator` is the actual execution Where a write operation is performed, the write operation is triggered at checkpoint.
+
+### 4.1 Index design based on Flink State
+
+Stateful computing is one of the highlights of the Flink engine. Compared with using external storage, using Flink's built-in `State` can significantly improve the performance of Flink applications. 
+Therefore, it would be a good choice to implement a Hudi index based on Flink's State.
+
+The core of the Hudi index is to maintain the mapping of the Hudi key `HoodieKey` and the location of the Hudi data `HoodieRecordLocation`. 
+Therefore, based on the current design, we can simply maintain a `MapState<HoodieKey, HoodieRecordLocation>` in Flink UDF to map the `HoodieKey` and `HoodieRecordLocation`, and leave the fault tolerance and persistence of State to the Flink framework.
+
+![dualism](/assets/images/blog/hudi-meets-flink/image2.png)
+
+## 5. Implementation examples
+### 1) HoodieTable
+
+```
+/**
+  * Abstract implementation of a HoodieTable.
+  *
+  * @param <T> Sub type of HoodieRecordPayload
+  * @param <I> Type of inputs
+  * @param <K> Type of keys
+  * @param <O> Type of outputs
+  */
+public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implements Serializable {
+
+   protected final HoodieWriteConfig config;
+   protected final HoodieTableMetaClient metaClient;
+   protected final HoodieIndex<T, I, K, O> index;
+
+   public abstract HoodieWriteMetadata<O> upsert(HoodieEngineContext context, String instantTime,
+       I records);
+
+   public abstract HoodieWriteMetadata<O> insert(HoodieEngineContext context, String instantTime,
+       I records);
+
+   public abstract HoodieWriteMetadata<O> bulkInsert(HoodieEngineContext context, String instantTime,
+       I records, Option<BulkInsertPartitioner<I>> bulkInsertPartitioner);
+
+   ...
+}
+```
+
+`HoodieTable` is one of the core abstractions of Hudi, which defines operations such as `insert`, `upsert`, and `bulkInsert` supported by the table. 
+Take `upsert` as an example, the input data is changed from the original `JavaRDD<HoodieRecord> inputRdds` to `I records`, and the runtime `JavaSparkContext jsc` is changed to `HoodieEngineContext context`.
+
+From the class annotations, we can see that `T, I, K, O` represent the load data type, input data type, primary key type and output data type of Hudi operation respectively. 
+These generics will run through the entire abstraction layer.
+
+### 2) HoodieEngineContext
+
+```
+/**
+ * Base class contains the context information needed by the engine at runtime. It will be extended by different
+ * engine implementation if needed.
+ */
+public abstract class HoodieEngineContext {
+
+  public abstract <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism);
+
+  public abstract <I, O> List<O> flatMap(List<I> data, SerializableFunction<I, Stream<O>> func, int parallelism);
+
+  public abstract <I> void foreach(List<I> data, SerializableConsumer<I> consumer, int parallelism);
+
+  ......
+}
+```
+
+`HoodieEngineContext` plays the role of `JavaSparkContext`, it not only provides all the information that `JavaSparkContext` can provide, 
+but also encapsulates many methods such as `map`, `flatMap`, `foreach`, and hides The specific implementation of `JavaSparkContext#map()`,`JavaSparkContext#flatMap()`, `JavaSparkContext#foreach()` and other methods.
+
+Take the `map` method as an example. In the Spark implementation class `HoodieSparkEngineContext`, the `map` method is as follows:
+
+```
+  @Override
+  public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism) {
+    return javaSparkContext.parallelize(data, parallelism).map(func::apply).collect();
+  }
+```
+
+In the engine that operates List, the implementation can be as follows (different methods need to pay attention to thread safety issues, use `parallel()` with caution):
+
+```
+  @Override
+  public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism) {
+    return data.stream().parallel().map(func::apply).collect(Collectors.toList());
+  }
+```
+
+Note:
+The exception thrown in the map function can be solved by wrapping `SerializableFunction<I, O> func`.
+
+Here is a brief introduction to `SerializableFunction`:
+
+```
+@FunctionalInterface
+public interface SerializableFunction<I, O> extends Serializable {
+  O apply(I v1) throws Exception;
+}
+```
+
+This method is actually a variant of `java.util.function.Function`. The difference from `java.util.function.Function` is that `SerializableFunction` can be serialized and can throw exceptions. 
+This function is introduced because the input parameters that the `JavaSparkContext#map()` function can receive must be serializable. 
+At the same time, there are many exceptions that need to be thrown in the logic of Hudi, and the code for `try catch` in the Lambda expression will be omitted It is bloated and not very elegant.
+
+## 6. Current progress and follow-up plan
+
+### 6.1 Working time axis
+
+![dualism](/assets/images/blog/hudi-meets-flink/image3.png)
+
+[T3go](https://www.t3go.cn/)
+[Aliyun](https://cn.aliyun.com/)
+[SF-express](https://www.sf-express.com/cn/sc/)
+
+### 6.2 Follow-up plan
+
+#### 1) Promote the integration of Hudi and Flink
+
+Push the integration of Flink and Hudi to the community as soon as possible. In the initial stage, this feature may only support kafka data sources.

Review comment:
       `kafka` -> `Kafka`?

##########
File path: docs/_posts/2020-10-15-apache-hudi-meets-apache-flink.md
##########
@@ -0,0 +1,196 @@
+---
+title: "Apache Hudi meets Apache Flink"
+excerpt: "The design and latest progress of the integration of Apache Hudi and Apache Flink."
+author: wangxianghu
+category: blog
+---
+
+Apache Hudi (Hudi for short) is a data lake framework created at Uber. Hudi joined the Apache incubator for incubation in January 2019, and was promoted to the top Apache project in May 2020. It is one of the most popular data lake frameworks.
+
+## 1. Why decouple
+
+Hudi has been using Spark as its data processing engine since its birth. If users want to use Hudi as their data lake framework, they must introduce Spark into their platform technology stack. 
+A few years ago, using Spark as a big data processing engine can be said to be very common or even natural. Since Spark can either perform batch processing or use micro-batch to simulate streaming, one engine solves both streaming and batch problems. 
+However, in recent years, with the development of big data technology, Flink, which is also a big data processing engine, has gradually entered people's vision and has occupied a certain market in the field of computing engines. 
+In the big data technology community, forums and other territories, the voice of whether Hudi supports Flink has gradually appeared and has become more frequent. Therefore, it is a valuable thing to make Hudi support the Flink engine, and the first step of integrating the Flink engine is that Hudi and Spark are decoupled.
+
+In addition, looking at the mature, active, and viable frameworks in the big data, all frameworks are elegant in design and can be integrated with other frameworks and leverage each other's expertise. 
+Therefore, decoupling Hudi from Spark and turning it into an engine-independent data lake framework will undoubtedly create more possibilities for the integration of Hudi and other components, allowing Hudi to better integrate into the big data ecosystem.
+
+## 2. Challenges
+
+Hudi's internal use of Spark API is as common as our usual development and use of List. Since the data source reads the data, and finally writes the data to the table, Spark RDD is used as the main data structure everywhere, and even ordinary tools are implemented using the Spark API. 
+It can be said that Hudi is a universal data lake framework implemented by Spark. Hudi also leverages deep Spark functionality like custom partitioning, in-memory caching to implement indexing and file sizing using workload heuristics. 
+For some of these, Flink offers better out-of-box support (e.g using Flink’s state store for indexing) and can in fact, make Hudi approach real-time latencies more and more. 
+
+In addition, the primary engine integrated after this decoupling is Flink. Flink and Spark differ greatly in core abstraction. Spark believes that data is bounded, and its core abstraction is a limited set of data. 
+Flink believes that the essence of data is a stream, and its core abstract DataStream contains various operations on data. Hudi has a streaming first design (record level updates, record level streams), that arguably fit the Flink model more naturally. 
+At the same time, there are multiple RDDs operating at the same time in Hudi, and the processing result of one RDD is combined with another RDD. 
+This difference in abstraction and the reuse of intermediate results during implementation make it difficult for Hudi to use a unified API to operate both RDD and DataStream in terms of decoupling abstraction.
+
+## 3. Decoupling Spark
+In theory, Hudi uses Spark as its computing engine to use Spark's distributed computing power and RDD's rich operator capabilities. Apart from distributed computing power, Hudi uses RDD more as a data structure, and RDD is essentially a bounded data set. 
+Therefore, it is theoretically feasible to replace RDD with List (of course, it may sacrifice performance/scale). In order to ensure the performance and stability of the Hudi Spark version as much as possible. We can keep the setting the bounded data set as the basic operation unit. 
+Hudi's main operation API remains unchanged, and RDD is extracted as a generic type. The Spark engine implementation still uses RDD, and other engines use List or other bounded  data set according to the actual situation.
+
+### Decoupling principle
+1) Unified generics. The input records `JavaRDD<HoodieRecord>`, key of input records `JavaRDD<HoodieKey>`, and result of write operations `JavaRDD<WriteStatus>` used by the Spark API use generic `I,K,O` instead;
+
+2) De-sparkization. All APIs of the abstraction layer must have nothing to do with Spark. Involving specific operations that are difficult to implement in the abstract layer, rewrite them as abstract methods and introduce Spark subclasses.
+
+For example: Hudi uses the `JavaSparkContext#map()` method in many places. To de-spark, you need to hide the `JavaSparkContext`. For this problem, we introduced the `HoodieEngineContext#map()` method, which will block the specific implementation details of `map`, so as to achieve de-sparkization in abstraction.
+
+3) Minimize changes to the abstraction layer to ensure the original function and performance of Hudi;
+
+4) Replace the `JavaSparkContext` with the `HoodieEngineContext` abstract class to provide the running environment context.
+
+In addition, some of the core algorithms in Hudi, like [rollbacks](https://github.com/apache/hudi/pull/1756), has been redone without the need for computing a workload profile ahead of time, which used to rely on Spark caching. 
+
+## 4. Flink integration design
+Hudi's write operation is batch processing in nature, and the continuous mode of `DeltaStreamer` is realized by looping batch processing. In order to use a unified API, when Hudi integrates Flink, we choose to collect a batch of data before processing, and finally submit it in a unified manner (here we use List to collect data in Flink).
+In Hudi terminology, we will stream data for a given commit, but only publish the commits every so often, making it practical to scale storage on cloud storage and also tunable.
+
+The easiest way to think of batch operation is to use a time window. However, when using a window, when there is no data flowing in a window, there will be no output data, and it is difficult for the Flink sink to judge whether all the data from a given batch has been processed. 
+Therefore, we use Flink's checkpoint mechanism to collect batches. The data between every two barriers is a batch. When there is no data in a subtask, the mock result data is made up. 
+In this way, on the sink side, when each subtask has result data issued, it can be considered that a batch of data has been processed and the commit can be executed.
+
+The DAG is as follows:
+
+![dualism](/assets/images/blog/hudi-meets-flink/image1.png)
+
+ - **Source:** receives Kafka data and converts it into `List<HoodieRecord>`;
+ - **InstantGeneratorOperator:** generates a globally unique instant. When the previous instant is not completed or the current batch has no data, no new instant is created;
+ - **KeyBy partitionPath:** partitions according to `partitionPath` to avoid multiple subtasks from writing the same partition;
+ - **WriteProcessOperator:** performs a write operation. When there is no data in the current partition, it sends empty result data to the downstream to make up the number;
+ - **CommitSink:** receives the calculation results of the upstream task. When receiving the parallelism results, it is considered that all the upstream subtasks are completed and the commit is executed.
+
+Note:
+`InstantGeneratorOperator` and `WriteProcessOperator` are both custom Flink operators. `InstantGeneratorOperator` will block checking the state of the previous instant to ensure that there is only one instant in the global (or requested) state.
+`WriteProcessOperator` is the actual execution Where a write operation is performed, the write operation is triggered at checkpoint.
+
+### 4.1 Index design based on Flink State
+
+Stateful computing is one of the highlights of the Flink engine. Compared with using external storage, using Flink's built-in `State` can significantly improve the performance of Flink applications. 
+Therefore, it would be a good choice to implement a Hudi index based on Flink's State.
+
+The core of the Hudi index is to maintain the mapping of the Hudi key `HoodieKey` and the location of the Hudi data `HoodieRecordLocation`. 
+Therefore, based on the current design, we can simply maintain a `MapState<HoodieKey, HoodieRecordLocation>` in Flink UDF to map the `HoodieKey` and `HoodieRecordLocation`, and leave the fault tolerance and persistence of State to the Flink framework.
+
+![dualism](/assets/images/blog/hudi-meets-flink/image2.png)
+
+## 5. Implementation examples
+### 1) HoodieTable
+
+```
+/**
+  * Abstract implementation of a HoodieTable.
+  *
+  * @param <T> Sub type of HoodieRecordPayload
+  * @param <I> Type of inputs
+  * @param <K> Type of keys
+  * @param <O> Type of outputs
+  */
+public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implements Serializable {
+
+   protected final HoodieWriteConfig config;
+   protected final HoodieTableMetaClient metaClient;
+   protected final HoodieIndex<T, I, K, O> index;
+
+   public abstract HoodieWriteMetadata<O> upsert(HoodieEngineContext context, String instantTime,
+       I records);
+
+   public abstract HoodieWriteMetadata<O> insert(HoodieEngineContext context, String instantTime,
+       I records);
+
+   public abstract HoodieWriteMetadata<O> bulkInsert(HoodieEngineContext context, String instantTime,
+       I records, Option<BulkInsertPartitioner<I>> bulkInsertPartitioner);
+
+   ...
+}
+```
+
+`HoodieTable` is one of the core abstractions of Hudi, which defines operations such as `insert`, `upsert`, and `bulkInsert` supported by the table. 
+Take `upsert` as an example, the input data is changed from the original `JavaRDD<HoodieRecord> inputRdds` to `I records`, and the runtime `JavaSparkContext jsc` is changed to `HoodieEngineContext context`.
+
+From the class annotations, we can see that `T, I, K, O` represent the load data type, input data type, primary key type and output data type of Hudi operation respectively. 
+These generics will run through the entire abstraction layer.
+
+### 2) HoodieEngineContext
+
+```
+/**
+ * Base class contains the context information needed by the engine at runtime. It will be extended by different
+ * engine implementation if needed.
+ */
+public abstract class HoodieEngineContext {
+
+  public abstract <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism);
+
+  public abstract <I, O> List<O> flatMap(List<I> data, SerializableFunction<I, Stream<O>> func, int parallelism);
+
+  public abstract <I> void foreach(List<I> data, SerializableConsumer<I> consumer, int parallelism);
+
+  ......
+}
+```
+
+`HoodieEngineContext` plays the role of `JavaSparkContext`, it not only provides all the information that `JavaSparkContext` can provide, 
+but also encapsulates many methods such as `map`, `flatMap`, `foreach`, and hides The specific implementation of `JavaSparkContext#map()`,`JavaSparkContext#flatMap()`, `JavaSparkContext#foreach()` and other methods.
+
+Take the `map` method as an example. In the Spark implementation class `HoodieSparkEngineContext`, the `map` method is as follows:
+
+```
+  @Override
+  public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism) {
+    return javaSparkContext.parallelize(data, parallelism).map(func::apply).collect();
+  }
+```
+
+In the engine that operates List, the implementation can be as follows (different methods need to pay attention to thread safety issues, use `parallel()` with caution):

Review comment:
       it would be better to use `thread-safety`?

##########
File path: docs/_posts/2020-10-15-apache-hudi-meets-apache-flink.md
##########
@@ -0,0 +1,196 @@
+---
+title: "Apache Hudi meets Apache Flink"
+excerpt: "The design and latest progress of the integration of Apache Hudi and Apache Flink."
+author: wangxianghu
+category: blog
+---
+
+Apache Hudi (Hudi for short) is a data lake framework created at Uber. Hudi joined the Apache incubator for incubation in January 2019, and was promoted to the top Apache project in May 2020. It is one of the most popular data lake frameworks.
+
+## 1. Why decouple
+
+Hudi has been using Spark as its data processing engine since its birth. If users want to use Hudi as their data lake framework, they must introduce Spark into their platform technology stack. 
+A few years ago, using Spark as a big data processing engine can be said to be very common or even natural. Since Spark can either perform batch processing or use micro-batch to simulate streaming, one engine solves both streaming and batch problems. 
+However, in recent years, with the development of big data technology, Flink, which is also a big data processing engine, has gradually entered people's vision and has occupied a certain market in the field of computing engines. 
+In the big data technology community, forums and other territories, the voice of whether Hudi supports Flink has gradually appeared and has become more frequent. Therefore, it is a valuable thing to make Hudi support the Flink engine, and the first step of integrating the Flink engine is that Hudi and Spark are decoupled.
+
+In addition, looking at the mature, active, and viable frameworks in the big data, all frameworks are elegant in design and can be integrated with other frameworks and leverage each other's expertise. 
+Therefore, decoupling Hudi from Spark and turning it into an engine-independent data lake framework will undoubtedly create more possibilities for the integration of Hudi and other components, allowing Hudi to better integrate into the big data ecosystem.
+
+## 2. Challenges
+
+Hudi's internal use of Spark API is as common as our usual development and use of List. Since the data source reads the data, and finally writes the data to the table, Spark RDD is used as the main data structure everywhere, and even ordinary tools are implemented using the Spark API. 
+It can be said that Hudi is a universal data lake framework implemented by Spark. Hudi also leverages deep Spark functionality like custom partitioning, in-memory caching to implement indexing and file sizing using workload heuristics. 
+For some of these, Flink offers better out-of-box support (e.g using Flink’s state store for indexing) and can in fact, make Hudi approach real-time latencies more and more. 
+
+In addition, the primary engine integrated after this decoupling is Flink. Flink and Spark differ greatly in core abstraction. Spark believes that data is bounded, and its core abstraction is a limited set of data. 
+Flink believes that the essence of data is a stream, and its core abstract DataStream contains various operations on data. Hudi has a streaming first design (record level updates, record level streams), that arguably fit the Flink model more naturally. 
+At the same time, there are multiple RDDs operating at the same time in Hudi, and the processing result of one RDD is combined with another RDD. 
+This difference in abstraction and the reuse of intermediate results during implementation make it difficult for Hudi to use a unified API to operate both RDD and DataStream in terms of decoupling abstraction.
+
+## 3. Decoupling Spark
+In theory, Hudi uses Spark as its computing engine to use Spark's distributed computing power and RDD's rich operator capabilities. Apart from distributed computing power, Hudi uses RDD more as a data structure, and RDD is essentially a bounded data set. 
+Therefore, it is theoretically feasible to replace RDD with List (of course, it may sacrifice performance/scale). In order to ensure the performance and stability of the Hudi Spark version as much as possible. We can keep the setting the bounded data set as the basic operation unit. 
+Hudi's main operation API remains unchanged, and RDD is extracted as a generic type. The Spark engine implementation still uses RDD, and other engines use List or other bounded  data set according to the actual situation.
+
+### Decoupling principle
+1) Unified generics. The input records `JavaRDD<HoodieRecord>`, key of input records `JavaRDD<HoodieKey>`, and result of write operations `JavaRDD<WriteStatus>` used by the Spark API use generic `I,K,O` instead;
+
+2) De-sparkization. All APIs of the abstraction layer must have nothing to do with Spark. Involving specific operations that are difficult to implement in the abstract layer, rewrite them as abstract methods and introduce Spark subclasses.
+
+For example: Hudi uses the `JavaSparkContext#map()` method in many places. To de-spark, you need to hide the `JavaSparkContext`. For this problem, we introduced the `HoodieEngineContext#map()` method, which will block the specific implementation details of `map`, so as to achieve de-sparkization in abstraction.
+
+3) Minimize changes to the abstraction layer to ensure the original function and performance of Hudi;
+
+4) Replace the `JavaSparkContext` with the `HoodieEngineContext` abstract class to provide the running environment context.
+
+In addition, some of the core algorithms in Hudi, like [rollbacks](https://github.com/apache/hudi/pull/1756), has been redone without the need for computing a workload profile ahead of time, which used to rely on Spark caching. 
+
+## 4. Flink integration design
+Hudi's write operation is batch processing in nature, and the continuous mode of `DeltaStreamer` is realized by looping batch processing. In order to use a unified API, when Hudi integrates Flink, we choose to collect a batch of data before processing, and finally submit it in a unified manner (here we use List to collect data in Flink).
+In Hudi terminology, we will stream data for a given commit, but only publish the commits every so often, making it practical to scale storage on cloud storage and also tunable.
+
+The easiest way to think of batch operation is to use a time window. However, when using a window, when there is no data flowing in a window, there will be no output data, and it is difficult for the Flink sink to judge whether all the data from a given batch has been processed. 
+Therefore, we use Flink's checkpoint mechanism to collect batches. The data between every two barriers is a batch. When there is no data in a subtask, the mock result data is made up. 
+In this way, on the sink side, when each subtask has result data issued, it can be considered that a batch of data has been processed and the commit can be executed.
+
+The DAG is as follows:
+
+![dualism](/assets/images/blog/hudi-meets-flink/image1.png)
+
+ - **Source:** receives Kafka data and converts it into `List<HoodieRecord>`;
+ - **InstantGeneratorOperator:** generates a globally unique instant. When the previous instant is not completed or the current batch has no data, no new instant is created;
+ - **KeyBy partitionPath:** partitions according to `partitionPath` to avoid multiple subtasks from writing the same partition;
+ - **WriteProcessOperator:** performs a write operation. When there is no data in the current partition, it sends empty result data to the downstream to make up the number;
+ - **CommitSink:** receives the calculation results of the upstream task. When receiving the parallelism results, it is considered that all the upstream subtasks are completed and the commit is executed.
+
+Note:
+`InstantGeneratorOperator` and `WriteProcessOperator` are both custom Flink operators. `InstantGeneratorOperator` will block checking the state of the previous instant to ensure that there is only one instant in the global (or requested) state.
+`WriteProcessOperator` is the actual execution Where a write operation is performed, the write operation is triggered at checkpoint.
+
+### 4.1 Index design based on Flink State
+
+Stateful computing is one of the highlights of the Flink engine. Compared with using external storage, using Flink's built-in `State` can significantly improve the performance of Flink applications. 
+Therefore, it would be a good choice to implement a Hudi index based on Flink's State.
+
+The core of the Hudi index is to maintain the mapping of the Hudi key `HoodieKey` and the location of the Hudi data `HoodieRecordLocation`. 
+Therefore, based on the current design, we can simply maintain a `MapState<HoodieKey, HoodieRecordLocation>` in Flink UDF to map the `HoodieKey` and `HoodieRecordLocation`, and leave the fault tolerance and persistence of State to the Flink framework.
+
+![dualism](/assets/images/blog/hudi-meets-flink/image2.png)
+
+## 5. Implementation examples
+### 1) HoodieTable
+
+```
+/**
+  * Abstract implementation of a HoodieTable.
+  *
+  * @param <T> Sub type of HoodieRecordPayload
+  * @param <I> Type of inputs
+  * @param <K> Type of keys
+  * @param <O> Type of outputs
+  */
+public abstract class HoodieTable<T extends HoodieRecordPayload, I, K, O> implements Serializable {
+
+   protected final HoodieWriteConfig config;
+   protected final HoodieTableMetaClient metaClient;
+   protected final HoodieIndex<T, I, K, O> index;
+
+   public abstract HoodieWriteMetadata<O> upsert(HoodieEngineContext context, String instantTime,
+       I records);
+
+   public abstract HoodieWriteMetadata<O> insert(HoodieEngineContext context, String instantTime,
+       I records);
+
+   public abstract HoodieWriteMetadata<O> bulkInsert(HoodieEngineContext context, String instantTime,
+       I records, Option<BulkInsertPartitioner<I>> bulkInsertPartitioner);
+
+   ...
+}
+```
+
+`HoodieTable` is one of the core abstractions of Hudi, which defines operations such as `insert`, `upsert`, and `bulkInsert` supported by the table. 
+Take `upsert` as an example, the input data is changed from the original `JavaRDD<HoodieRecord> inputRdds` to `I records`, and the runtime `JavaSparkContext jsc` is changed to `HoodieEngineContext context`.
+
+From the class annotations, we can see that `T, I, K, O` represent the load data type, input data type, primary key type and output data type of Hudi operation respectively. 
+These generics will run through the entire abstraction layer.
+
+### 2) HoodieEngineContext
+
+```
+/**
+ * Base class contains the context information needed by the engine at runtime. It will be extended by different
+ * engine implementation if needed.
+ */
+public abstract class HoodieEngineContext {
+
+  public abstract <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism);
+
+  public abstract <I, O> List<O> flatMap(List<I> data, SerializableFunction<I, Stream<O>> func, int parallelism);
+
+  public abstract <I> void foreach(List<I> data, SerializableConsumer<I> consumer, int parallelism);
+
+  ......
+}
+```
+
+`HoodieEngineContext` plays the role of `JavaSparkContext`, it not only provides all the information that `JavaSparkContext` can provide, 
+but also encapsulates many methods such as `map`, `flatMap`, `foreach`, and hides The specific implementation of `JavaSparkContext#map()`,`JavaSparkContext#flatMap()`, `JavaSparkContext#foreach()` and other methods.
+
+Take the `map` method as an example. In the Spark implementation class `HoodieSparkEngineContext`, the `map` method is as follows:
+
+```
+  @Override
+  public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism) {
+    return javaSparkContext.parallelize(data, parallelism).map(func::apply).collect();
+  }
+```
+
+In the engine that operates List, the implementation can be as follows (different methods need to pay attention to thread safety issues, use `parallel()` with caution):
+
+```
+  @Override
+  public <I, O> List<O> map(List<I> data, SerializableFunction<I, O> func, int parallelism) {
+    return data.stream().parallel().map(func::apply).collect(Collectors.toList());
+  }
+```
+
+Note:
+The exception thrown in the map function can be solved by wrapping `SerializableFunction<I, O> func`.
+
+Here is a brief introduction to `SerializableFunction`:
+
+```
+@FunctionalInterface
+public interface SerializableFunction<I, O> extends Serializable {
+  O apply(I v1) throws Exception;
+}
+```
+
+This method is actually a variant of `java.util.function.Function`. The difference from `java.util.function.Function` is that `SerializableFunction` can be serialized and can throw exceptions. 
+This function is introduced because the input parameters that the `JavaSparkContext#map()` function can receive must be serializable. 
+At the same time, there are many exceptions that need to be thrown in the logic of Hudi, and the code for `try catch` in the Lambda expression will be omitted It is bloated and not very elegant.

Review comment:
       `try-catch`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yanghua merged pull request #2181: [HUDI-911] Add Blog about Hudi-Spark decoupling and Flink integration…

Posted by GitBox <gi...@apache.org>.
yanghua merged pull request #2181:
URL: https://github.com/apache/hudi/pull/2181


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] wangxianghu commented on a change in pull request #2181: [HUDI-911] Add Blog about Hudi-Spark decoupling and Flink integration…

Posted by GitBox <gi...@apache.org>.
wangxianghu commented on a change in pull request #2181:
URL: https://github.com/apache/hudi/pull/2181#discussion_r509225987



##########
File path: docs/_posts/2020-10-15-apache-hudi-meets-apache-flink.md
##########
@@ -0,0 +1,196 @@
+---
+title: "Apache Hudi meets Apache Flink"
+excerpt: "The design and latest progress of the integration of Apache Hudi and Apache Flink."
+author: wangxianghu
+category: blog
+---
+
+Apache Hudi is a data lake framework developed and open sourced by Uber. Hudi joined the Apache incubator for incubation in January 2019, and was promoted to the top Apache project in May 2020. It is one of the most popular data lake frameworks.

Review comment:
       > "originally developed" or "created at Uber" ? Hudi is very much an Apache project.
   
   yes better

##########
File path: docs/_posts/2020-10-15-apache-hudi-meets-apache-flink.md
##########
@@ -0,0 +1,196 @@
+---
+title: "Apache Hudi meets Apache Flink"
+excerpt: "The design and latest progress of the integration of Apache Hudi and Apache Flink."
+author: wangxianghu
+category: blog
+---
+
+Apache Hudi is a data lake framework developed and open sourced by Uber. Hudi joined the Apache incubator for incubation in January 2019, and was promoted to the top Apache project in May 2020. It is one of the most popular data lake frameworks.

Review comment:
       > also just denote that Hudi means `Apache Hudi`. Can we say "Apache Hudi (hudi for short) is a data lake ..."
   
   good idea

##########
File path: docs/_posts/2020-10-15-apache-hudi-meets-apache-flink.md
##########
@@ -0,0 +1,196 @@
+---
+title: "Apache Hudi meets Apache Flink"
+excerpt: "The design and latest progress of the integration of Apache Hudi and Apache Flink."
+author: wangxianghu
+category: blog
+---
+
+Apache Hudi is a data lake framework developed and open sourced by Uber. Hudi joined the Apache incubator for incubation in January 2019, and was promoted to the top Apache project in May 2020. It is one of the most popular data lake frameworks.
+
+## 1. Why decouple
+
+Hudi has been using Spark as its data processing engine since its birth. If users want to use Hudi as their data lake framework, they must introduce Spark into their platform technology stack. 
+A few years ago, using Spark as a big data processing engine can be said to be very common or even natural. Since Spark can either perform batch processing or use micro-batch to simulate streaming, one engine solves both streaming and batch problems. 
+However, in recent years, with the development of big data technology, Flink, which is also a big data processing engine, has gradually entered people's vision and has occupied a certain market in the field of computing engines. 
+In the big data technology community, forums and other territories, the voice of whether Hudi supports Flink has gradually appeared and has become more frequent. Therefore, it is a valuable thing to make Hudi support the Flink engine, and the first step of integrating the Flink engine is that Hudi and Spark are decoupled.
+
+In addition, looking at the mature, active, and viable frameworks in the big data, all frameworks are elegant in design and can be integrated with other frameworks and leverage each other's expertise. 
+Therefore, decoupling Hudi from Spark and turning it into an engine-independent data lake framework will undoubtedly create more possibilities for the integration of Hudi and other components, allowing Hudi to better integrate into the big data ecosystem.
+
+## 2. Challenges
+
+Hudi's internal use of Spark API is as common as our usual development and use of List. Since the data source reads the data, and finally writes the data to the table, Spark RDD is used as the main data structure everywhere, and even ordinary tools are implemented using the Spark API. 
+It can be said that Hudi is a universal data lake framework implemented by Spark. Hudi also leverages deep Spark functionality like custom partitioning, in-memory caching to implement indexing and file sizing using workload heuristics. 
+For some of these, Flink offers better out-of-box support (e.g using Flink’s state store for indexing) and can in fact, make Hudi approach real-time latencies more and more. 
+
+In addition, the primary engine integrated after this decoupling is Flink. Flink and Spark differ greatly in core abstraction. Spark believes that data is bounded, and its core abstraction is a limited set of data. 
+Flink believes that the essence of data is a stream, and its core abstract DataStream contains various operations on data. Hudi has a streaming first design (record level updates, record level streams), that arguably fit the Flink model more naturally. 
+At the same time, there are multiple RDDs operating at the same time in Hudi, and the processing result of one RDD is combined with another RDD. 
+This difference in abstraction and the reuse of intermediate results during implementation make it difficult for Hudi to use a unified API to operate both RDD and DataStream in terms of decoupling abstraction.
+
+## 3. Decoupling Spark
+In theory, Hudi uses Spark as its computing engine to use Spark's distributed computing power and RDD's rich operator capabilities. Apart from distributed computing power, Hudi uses RDD more as a data structure, and RDD is essentially a bounded data set. 
+Therefore, it is theoretically feasible to replace RDD with List (of course, it may sacrifice performance/scale). In order to ensure the performance and stability of the Hudi Spark version as much as possible. We can keep the setting the bounded data set as the basic operation unit. 
+Hudi's main operation API remains unchanged, and RDD is extracted as a generic type. The Spark engine implementation still uses RDD, and other engines use List or other bounded  data set according to the actual situation.
+
+### Decoupling principle
+1) Unified generics. The input records `JavaRDD<HoodieRecord>`, key of input records `JavaRDD<HoodieKey>`, and result of write operations `JavaRDD<WriteStatus>` used by the Spark API use generic `I,K,O` instead;
+
+2) De-sparkization. All APIs of the abstraction layer must have nothing to do with Spark. Involving specific operations that are difficult to implement in the abstract layer, rewrite them as abstract methods and introduce Spark subclasses.
+
+For example: Hudi uses the `JavaSparkContext#map()` method in many places. To de-spark, you need to hide the `JavaSparkContext`. For this problem, we introduced the `HoodieEngineContext#map()` method, which will block the specific implementation details of `map`, so as to achieve de-sparkization in abstraction.
+
+3) Minimize changes to the abstraction layer to ensure the original function and performance of Hudi;
+
+4) Replace the `JavaSparkContext` with the `HoodieEngineContext` abstract class to provide the running environment context.
+
+In addition, some of the core algorithms in Hudi, like rollbacks, has been redone without the need for computing a workload profile ahead of time, which used to rely on Spark caching. 

Review comment:
       > may be link to marker based rollback PR?
   
   done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hudi] yanghua commented on a change in pull request #2181: [HUDI-911] Add Blog about Hudi-Spark decoupling and Flink integration…

Posted by GitBox <gi...@apache.org>.
yanghua commented on a change in pull request #2181:
URL: https://github.com/apache/hudi/pull/2181#discussion_r508370772



##########
File path: docs/_posts/2020-10-15-apache-hudi-meets-apache-flink.md
##########
@@ -0,0 +1,196 @@
+---
+title: "Apache Hudi meets Apache Flink"
+excerpt: "The design and latest progress of the integration of Apache hudi and Apache Flink."
+author: wangxianghu
+category: blog
+---
+
+Apache Hudi is a data lake framework developed and open sourced by Uber. Hudi joined the Apache incubator for incubation in January 2019, and was promoted to the top Apache project in May 2020. It is one of the most popular data lake frameworks.
+
+## 1. Why decouple
+
+Hudi has been using Spark as its data processing engine since its birth. If users want to use Hudi as their data lake framework, they must introduce Spark into their platform technology stack. 
+A few years ago, using Spark as a big data processing engine can be said to be very common or even natural. Since Spark can either perform batch processing or use micro-batch to simulate streaming, one engine solves both streaming and batch problems. 
+However, in recent years, with the development of big data technology, Flink, which is also a big data processing engine, has gradually entered people's vision and has occupied a certain market in the field of computing engines. 
+In the big data technology community, forums and other territories, the voice of whether Hudi supports flink has gradually appeared and has become more frequent. Therefore, it is a valuable thing to make hudi support the flink engine, and the first step of integrating the Flink engine is that hudi and spark are decoupled.
+
+In addition, looking at the mature, active, and viable frameworks in the big data, all frameworks are elegant in design and can be integrated with other frameworks and leverage each other's expertise. 
+Therefore, decoupling Hudi from Spark and turning it into an engine-independent data lake framework will undoubtedly create more possibilities for the integration of Hudi and other components, allowing Hudi to better integrate into the big data ecosystem.
+
+## 2. Challenges
+
+Hudi's internal use of Spark API is as common as our usual development and use of List. Since the data source reads the data, and finally writes the data to the table, Spark RDD is used as the main data structure everywhere, and even ordinary tools are implemented using the Spark API. 
+It can be said that Hudi is a universal data lake framework implemented by Spark. Hudi also leverage deep Spark functionality like custom partitioning, in-memory caching to implement indexing and file sizing using workload heuristics. 

Review comment:
       leverage -> leverages

##########
File path: docs/_posts/2020-10-15-apache-hudi-meets-apache-flink.md
##########
@@ -0,0 +1,196 @@
+---
+title: "Apache Hudi meets Apache Flink"
+excerpt: "The design and latest progress of the integration of Apache hudi and Apache Flink."
+author: wangxianghu
+category: blog
+---
+
+Apache Hudi is a data lake framework developed and open sourced by Uber. Hudi joined the Apache incubator for incubation in January 2019, and was promoted to the top Apache project in May 2020. It is one of the most popular data lake frameworks.
+
+## 1. Why decouple
+
+Hudi has been using Spark as its data processing engine since its birth. If users want to use Hudi as their data lake framework, they must introduce Spark into their platform technology stack. 
+A few years ago, using Spark as a big data processing engine can be said to be very common or even natural. Since Spark can either perform batch processing or use micro-batch to simulate streaming, one engine solves both streaming and batch problems. 
+However, in recent years, with the development of big data technology, Flink, which is also a big data processing engine, has gradually entered people's vision and has occupied a certain market in the field of computing engines. 
+In the big data technology community, forums and other territories, the voice of whether Hudi supports flink has gradually appeared and has become more frequent. Therefore, it is a valuable thing to make hudi support the flink engine, and the first step of integrating the Flink engine is that hudi and spark are decoupled.

Review comment:
       make sure all "flink" -> "Flink", "spark" -> "Spark", "hudi" -> "Hudi"

##########
File path: docs/_posts/2020-10-15-apache-hudi-meets-apache-flink.md
##########
@@ -0,0 +1,196 @@
+---
+title: "Apache Hudi meets Apache Flink"
+excerpt: "The design and latest progress of the integration of Apache hudi and Apache Flink."
+author: wangxianghu
+category: blog
+---
+
+Apache Hudi is a data lake framework developed and open sourced by Uber. Hudi joined the Apache incubator for incubation in January 2019, and was promoted to the top Apache project in May 2020. It is one of the most popular data lake frameworks.
+
+## 1. Why decouple
+
+Hudi has been using Spark as its data processing engine since its birth. If users want to use Hudi as their data lake framework, they must introduce Spark into their platform technology stack. 
+A few years ago, using Spark as a big data processing engine can be said to be very common or even natural. Since Spark can either perform batch processing or use micro-batch to simulate streaming, one engine solves both streaming and batch problems. 
+However, in recent years, with the development of big data technology, Flink, which is also a big data processing engine, has gradually entered people's vision and has occupied a certain market in the field of computing engines. 
+In the big data technology community, forums and other territories, the voice of whether Hudi supports flink has gradually appeared and has become more frequent. Therefore, it is a valuable thing to make hudi support the flink engine, and the first step of integrating the Flink engine is that hudi and spark are decoupled.
+
+In addition, looking at the mature, active, and viable frameworks in the big data, all frameworks are elegant in design and can be integrated with other frameworks and leverage each other's expertise. 
+Therefore, decoupling Hudi from Spark and turning it into an engine-independent data lake framework will undoubtedly create more possibilities for the integration of Hudi and other components, allowing Hudi to better integrate into the big data ecosystem.
+
+## 2. Challenges
+
+Hudi's internal use of Spark API is as common as our usual development and use of List. Since the data source reads the data, and finally writes the data to the table, Spark RDD is used as the main data structure everywhere, and even ordinary tools are implemented using the Spark API. 
+It can be said that Hudi is a universal data lake framework implemented by Spark. Hudi also leverage deep Spark functionality like custom partitioning, in-memory caching to implement indexing and file sizing using workload heuristics. 
+For some of these, Flink offers better out-of-box support (e.g using Flink’s state store for indexing) and can in fact, make Hudi approach real-time latencies more and more. 
+
+In addition, the primary engine integrated after this decoupling is Flink. Flink and Spark differ greatly in core abstraction. Spark believes that data is bounded, and its core abstraction is a limited set of data. 
+Flink believes that the essence of data is stream, and its core abstract DataStream contains various operations on data. Hudi, has a streaming first design (record level updates, record level streams), that arguably fit the Flink model more naturally. 

Review comment:
       `stream ` -> `a stream`

##########
File path: docs/_posts/2020-10-15-apache-hudi-meets-apache-flink.md
##########
@@ -0,0 +1,196 @@
+---
+title: "Apache Hudi meets Apache Flink"
+excerpt: "The design and latest progress of the integration of Apache hudi and Apache Flink."
+author: wangxianghu
+category: blog
+---
+
+Apache Hudi is a data lake framework developed and open sourced by Uber. Hudi joined the Apache incubator for incubation in January 2019, and was promoted to the top Apache project in May 2020. It is one of the most popular data lake frameworks.
+
+## 1. Why decouple
+
+Hudi has been using Spark as its data processing engine since its birth. If users want to use Hudi as their data lake framework, they must introduce Spark into their platform technology stack. 
+A few years ago, using Spark as a big data processing engine can be said to be very common or even natural. Since Spark can either perform batch processing or use micro-batch to simulate streaming, one engine solves both streaming and batch problems. 
+However, in recent years, with the development of big data technology, Flink, which is also a big data processing engine, has gradually entered people's vision and has occupied a certain market in the field of computing engines. 
+In the big data technology community, forums and other territories, the voice of whether Hudi supports flink has gradually appeared and has become more frequent. Therefore, it is a valuable thing to make hudi support the flink engine, and the first step of integrating the Flink engine is that hudi and spark are decoupled.
+
+In addition, looking at the mature, active, and viable frameworks in the big data, all frameworks are elegant in design and can be integrated with other frameworks and leverage each other's expertise. 
+Therefore, decoupling Hudi from Spark and turning it into an engine-independent data lake framework will undoubtedly create more possibilities for the integration of Hudi and other components, allowing Hudi to better integrate into the big data ecosystem.
+
+## 2. Challenges
+
+Hudi's internal use of Spark API is as common as our usual development and use of List. Since the data source reads the data, and finally writes the data to the table, Spark RDD is used as the main data structure everywhere, and even ordinary tools are implemented using the Spark API. 
+It can be said that Hudi is a universal data lake framework implemented by Spark. Hudi also leverage deep Spark functionality like custom partitioning, in-memory caching to implement indexing and file sizing using workload heuristics. 
+For some of these, Flink offers better out-of-box support (e.g using Flink’s state store for indexing) and can in fact, make Hudi approach real-time latencies more and more. 
+
+In addition, the primary engine integrated after this decoupling is Flink. Flink and Spark differ greatly in core abstraction. Spark believes that data is bounded, and its core abstraction is a limited set of data. 
+Flink believes that the essence of data is stream, and its core abstract DataStream contains various operations on data. Hudi, has a streaming first design (record level updates, record level streams), that arguably fit the Flink model more naturally. 

Review comment:
       `Hudi,` -> `Hudi`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org