You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/04/26 10:59:35 UTC

[GitHub] [hudi] prashantwason commented on a diff in pull request #4309: [HUDI-3016][RFC-43] Proposal to implement Compaction/Clustering Servi…

prashantwason commented on code in PR #4309:
URL: https://github.com/apache/hudi/pull/4309#discussion_r858576683


##########
rfc/rfc-43/rfc-43.md:
##########
@@ -0,0 +1,257 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+# RFC-43: Implement Table Management Service for Hudi
+
+## Proposers
+
+- @yuzhaojing
+
+## Approvers
+
+- @vinothchandar
+- @Raymond
+
+## Status
+
+JIRA: [https://issues.apache.org/jira/browse/HUDI-3016](https://issues.apache.org/jira/browse/HUDI-3016)
+
+## Abstract
+
+Hudi table needs table management operations. Currently, schedule these job provides Three ways:
+
+- Inline, execute these job and writing job in the same application, perform the these job and writing job serially.
+
+- Async, execute these job and writing job in the same application, Async parallel execution of these job and write job.
+
+- Independent compaction/clustering job, execute an async compaction/clustering job of another application.
+
+With the increase in the number of HUDI tables, due to a lack of management capabilities, maintenance costs will become
+higher. This proposal is to implement an independent compaction/clustering Service to manage the Hudi
+compaction/clustering job.
+
+## Background
+
+In the current implementation, if the HUDI table needs do compact/cluster, it only has three ways:
+
+1. Use inline compaction/clustering, in this mode the job will be block writing job.
+
+2. Using Async compaction/clustering, in this mode the job execute async but also sharing the resource with HUDI to
+   write a job that may affect the stability of job writing, which is not what the user wants to see.
+
+3. Using independent compaction/clustering job is a better way to schedule the job, in this mode the job execute async
+   and do not sharing resources with writing job, but also has some questions:
+    1. Users have to enable lock service providers so that there is not data loss. Especially when compaction/clustering
+       is getting scheduled, no other writes should proceed concurrently and hence a lock is required.
+    2. The user needs to manually start an async compaction/clustering application, which means that the user needs to
+       maintain two jobs.
+    3. With the increase in the number of HUDI jobs, there is no unified service to manage compaction/clustering jobs (
+       monitor, retry, history, etc...), which will make maintenance costs increase.
+
+With this effort, we want to provide an independent compaction/clustering Service, it will have these abilities:
+
+- Provides a pluggable execution interface that can adapt to multiple execution engines, such as Spark and Flink.
+
+- With the ability to failover, need to be persisted compaction/clustering message.
+
+- Perfect metrics and reuse HoodieMetric expose to the outside.
+
+- Provide automatic failure retry for compaction/clustering job.
+
+## Implementation
+
+![](service.jpg)

Review Comment:
   The following interfaces would be required:
   1. API (REST / GRPC) - We would need GRPC 
        - The Request Handler would be common
    2. Execution Engine:
        - This is the component which executes a Spark job (e.g. Spark-submit) and returns result
     3. Metrics / alerts may also need to be added



##########
rfc/rfc-43/rfc-43.md:
##########
@@ -0,0 +1,257 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+# RFC-43: Implement Table Management Service for Hudi
+
+## Proposers
+
+- @yuzhaojing
+
+## Approvers
+
+- @vinothchandar
+- @Raymond
+
+## Status
+
+JIRA: [https://issues.apache.org/jira/browse/HUDI-3016](https://issues.apache.org/jira/browse/HUDI-3016)
+
+## Abstract
+
+Hudi table needs table management operations. Currently, schedule these job provides Three ways:
+
+- Inline, execute these job and writing job in the same application, perform the these job and writing job serially.
+
+- Async, execute these job and writing job in the same application, Async parallel execution of these job and write job.
+
+- Independent compaction/clustering job, execute an async compaction/clustering job of another application.
+
+With the increase in the number of HUDI tables, due to a lack of management capabilities, maintenance costs will become
+higher. This proposal is to implement an independent compaction/clustering Service to manage the Hudi

Review Comment:
   I think we should generalize this PR to include all type of "services" on HUDI Tables. 
   
   A service on HUDI table can be defined as any operation which needs to be run async. This includes compaction, clustering, async-indexing, cleaning, validation, etc. 
   
   Service definition can take various key-value parameters like: 
    - basepath
    - priority
    - mainClass (java class to execute to run the service)
    - context (key=value params specific to the service execution)
   
   This allows adding more services in the future and running various other management operations on HUDI datasets.



##########
rfc/rfc-43/rfc-43.md:
##########
@@ -0,0 +1,222 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-43: Implement Compaction/Clustering Service for Hudi
+
+## Proposers
+- @yuzhaojing
+
+## Approvers
+- @garyli1019
+- @leesf
+- @vinothchandar
+
+## Status
+JIRA: [https://issues.apache.org/jira/browse/HUDI-3016](https://issues.apache.org/jira/browse/HUDI-3016)
+
+## Abstract
+
+Hudi table needs compaction/clustering to rewrite data. Currently, schedule compaction/clustering job provides Three ways:
+
+- Inline, execute compaction/clustering job and writing job in the same application, perform the compaction/clustering job and writing job serially.
+
+- Async, execute compaction/clustering job and writing job in the same application, Async parallel execution of compaction/clustering job and write job.
+
+- Independent compaction/clustering job, execute an async compaction/clustering job of another application.
+
+With the increase in the number of HUDI tables, due to a lack of management capabilities, maintenance costs will become higher. This proposal is to implement an independent compaction/clustering Service to manage the Hudi compaction/clustering job.
+
+## Background
+
+In the current implementation, if the HUDI table needs do compact/cluster, it only has three ways:
+
+1. Use inline compaction/clustering, in this mode the job will be block writing job.
+
+2. Using Async compaction/clustering, in this mode the job execute async but also sharing the resource with HUDI to write a job that may affect the stability of job writing, which is not what the user wants to see.
+
+3. Using independent compaction/clustering job is a better way to schedule the job, in this mode the job execute async and do not sharing resources with writing job, but also has some questions:
+   1. The user needs to manually start an async compaction/clustering application, which means that the user needs to maintain two jobs.
+   2. With the increase in the number of HUDI jobs, there is no unified service to manage compaction/clustering jobs (monitor, retry, history, etc...), which will make maintenance costs increase.
+
+With this effort, we want to provide an independent compaction/clustering Service, it will have these abilities:
+
+- Provides a pluggable execution interface that can adapt to multiple execution engines, such as Spark and Flink.
+
+- With the ability to failover, need to be persisted compaction/clustering message.
+
+- Perfect metrics and reuse HoodieMetric expose to the outside.
+
+- Provide automatic failure retry for compaction/clustering job.
+
+## Implementation
+
+![](service.png)
+
+### Client
+
+Register to Service with CompactionConfig/ClusteringConfig when the job starts. When generating a compaction/clustering plan, report to the compaction/clustering Service address specified in HoodieConfig and request the Service to stop the corresponding compaction/clustering job when the Service is not required or rollback compaction/clustering instant.
+
+### Request Handler
+
+Receive the client's request and save the plan in the meta table.
+
+### Meta Table
+
+Meta table has been implemented by using an internal HUDI MOR Table to store the required metadata for the compaction/clustering message. This table will be internal to a dataset and will not be exposed directly to the user to write/modify.
+
+The Meta Table has the following features:
+
+1. Is a HUDI MOR Table
+
+2. Defines its own custom payload HoodieServicePayload
+
+3. Use basePath + instant as recordKey to guarantee unique
+
+4. Follows all the conventions of a HUDI Table
+
+### Scheduler
+
+Periodically scan the Meta Table and submit compaction/clustering job according to user-specified rules, and need to plug-in the execution engine.
+
+### Schema
+
+Being an actual Hoodie Table, the Meta Table needs to have a schema to store the compaction/clustering message. 
+
+```
+{
+    "namespace":"org.apache.hudi.avro.model",
+    "type":"record",
+    "name":"HoodieServiceRecord",
+    "doc":"Compaction/Clustering message saved within the Meta Table",
+    "fields":[
+        {
+            "name":"table_name",
+            "type":"string"
+        },
+        {
+            "name":"base_path",
+            "type":"string"
+        },
+        {
+            "name":"instant",
+            "doc":"compaction/clustering instant for the hoodie table",
+            "type":"string"
+        }, {
+            "name":"action",
+            "doc":"compact or cluster for this instant",
+            "type":"string"
+        },{
+            "name":"status",
+            "doc":"Status of the compaction/clustering job, 0:Scheduled 1:Running 2:Retrying 3:  Completed",
+            "type":"int"
+        } ,{
+            "name":"isDeleted",
+            "doc":"True if this plan has been deleted",
+            "type":"int"
+        }
+    ]
+}
+```
+
+## Interface design
+
+### Register
+
+/v1/hoodie/service/register
+
+```
+{
+"table_name":"table_name",
+"base_path":"/hoodie/base_path",
+"compaction_config":"compaction_config",
+"clustering_config":"clustering_config"
+}
+```
+
+### Schedule
+
+/v1/hoodie/service/schedule
+
+```
+{
+"table_name":"table_name",
+"base_path":"/hoodie/base_path",
+"action":"compact/cluster",
+"instant":"202111161559"
+}
+```
+
+
+### Delete
+
+/v1/hoodie/service/delete
+
+```
+{
+"table_name":"table_name",
+"base_path":"/hoodie/base_path",
+"action":"compact/cluster",
+"instant":"202111161559"
+}
+```
+
+## Error Handling
+
+Due to two tables being involved in each operation, we need to ensure that they are in sync and errors during dataset operation are handled correctly.
+
+1. Client scheduled the plan but request failed.
+
+2. RequestHandler received request but the commit is not completed.
+
+3. Client rollback plan after request to Compaction/Clustering Service.

Review Comment:
   Furthermore, we can implement a Java client for table services - HoodieTableServiceClient - which can enable scheduling any type of services from within the logic of table operation.
   
   E.g. Assume the HoodieWriteConfig enable record level index. Since the index does not exist yet, it needs to be bootstrapped. There are two possibilities here:
   1. The user manually runs async-indexing to initialize the record index
   2. The metadata table code detects that record index is enabled but not present and calls HoodieTableServiceClient.scheduleService(...) to enable async indexing automatically.
   
   The 2 scenario above can be extended to various intelligent cases:
   1. The ingestion side code decided when it is time to compact the MOR table (based on some metric like read time / write time) rather than a fixed schedule like after 10 deltacommits.
   



##########
rfc/rfc-43/rfc-43.md:
##########
@@ -0,0 +1,222 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+# RFC-43: Implement Compaction/Clustering Service for Hudi
+
+## Proposers
+- @yuzhaojing
+
+## Approvers
+- @garyli1019
+- @leesf
+- @vinothchandar
+
+## Status
+JIRA: [https://issues.apache.org/jira/browse/HUDI-3016](https://issues.apache.org/jira/browse/HUDI-3016)
+
+## Abstract
+
+Hudi table needs compaction/clustering to rewrite data. Currently, schedule compaction/clustering job provides Three ways:
+
+- Inline, execute compaction/clustering job and writing job in the same application, perform the compaction/clustering job and writing job serially.
+
+- Async, execute compaction/clustering job and writing job in the same application, Async parallel execution of compaction/clustering job and write job.
+
+- Independent compaction/clustering job, execute an async compaction/clustering job of another application.
+
+With the increase in the number of HUDI tables, due to a lack of management capabilities, maintenance costs will become higher. This proposal is to implement an independent compaction/clustering Service to manage the Hudi compaction/clustering job.
+
+## Background
+
+In the current implementation, if the HUDI table needs do compact/cluster, it only has three ways:
+
+1. Use inline compaction/clustering, in this mode the job will be block writing job.
+
+2. Using Async compaction/clustering, in this mode the job execute async but also sharing the resource with HUDI to write a job that may affect the stability of job writing, which is not what the user wants to see.
+
+3. Using independent compaction/clustering job is a better way to schedule the job, in this mode the job execute async and do not sharing resources with writing job, but also has some questions:
+   1. The user needs to manually start an async compaction/clustering application, which means that the user needs to maintain two jobs.
+   2. With the increase in the number of HUDI jobs, there is no unified service to manage compaction/clustering jobs (monitor, retry, history, etc...), which will make maintenance costs increase.
+
+With this effort, we want to provide an independent compaction/clustering Service, it will have these abilities:
+
+- Provides a pluggable execution interface that can adapt to multiple execution engines, such as Spark and Flink.
+
+- With the ability to failover, need to be persisted compaction/clustering message.
+
+- Perfect metrics and reuse HoodieMetric expose to the outside.
+
+- Provide automatic failure retry for compaction/clustering job.
+
+## Implementation
+
+![](service.png)
+
+### Client
+
+Register to Service with CompactionConfig/ClusteringConfig when the job starts. When generating a compaction/clustering plan, report to the compaction/clustering Service address specified in HoodieConfig and request the Service to stop the corresponding compaction/clustering job when the Service is not required or rollback compaction/clustering instant.
+
+### Request Handler
+
+Receive the client's request and save the plan in the meta table.
+
+### Meta Table
+
+Meta table has been implemented by using an internal HUDI MOR Table to store the required metadata for the compaction/clustering message. This table will be internal to a dataset and will not be exposed directly to the user to write/modify.
+
+The Meta Table has the following features:
+
+1. Is a HUDI MOR Table
+
+2. Defines its own custom payload HoodieServicePayload
+
+3. Use basePath + instant as recordKey to guarantee unique
+
+4. Follows all the conventions of a HUDI Table
+
+### Scheduler
+
+Periodically scan the Meta Table and submit compaction/clustering job according to user-specified rules, and need to plug-in the execution engine.
+
+### Schema
+
+Being an actual Hoodie Table, the Meta Table needs to have a schema to store the compaction/clustering message. 
+
+```
+{
+    "namespace":"org.apache.hudi.avro.model",
+    "type":"record",
+    "name":"HoodieServiceRecord",
+    "doc":"Compaction/Clustering message saved within the Meta Table",
+    "fields":[
+        {
+            "name":"table_name",
+            "type":"string"
+        },
+        {
+            "name":"base_path",
+            "type":"string"
+        },
+        {
+            "name":"instant",
+            "doc":"compaction/clustering instant for the hoodie table",
+            "type":"string"
+        }, {
+            "name":"action",
+            "doc":"compact or cluster for this instant",
+            "type":"string"
+        },{
+            "name":"status",
+            "doc":"Status of the compaction/clustering job, 0:Scheduled 1:Running 2:Retrying 3:  Completed",
+            "type":"int"
+        } ,{
+            "name":"isDeleted",
+            "doc":"True if this plan has been deleted",
+            "type":"int"
+        }
+    ]
+}
+```
+
+## Interface design
+
+### Register
+
+/v1/hoodie/service/register
+
+```
+{
+"table_name":"table_name",
+"base_path":"/hoodie/base_path",
+"compaction_config":"compaction_config",
+"clustering_config":"clustering_config"
+}
+```
+
+### Schedule
+
+/v1/hoodie/service/schedule
+
+```
+{
+"table_name":"table_name",
+"base_path":"/hoodie/base_path",
+"action":"compact/cluster",
+"instant":"202111161559"
+}
+```
+
+
+### Delete
+
+/v1/hoodie/service/delete
+
+```
+{
+"table_name":"table_name",
+"base_path":"/hoodie/base_path",
+"action":"compact/cluster",
+"instant":"202111161559"
+}
+```
+
+## Error Handling
+
+Due to two tables being involved in each operation, we need to ensure that they are in sync and errors during dataset operation are handled correctly.
+
+1. Client scheduled the plan but request failed.
+
+2. RequestHandler received request but the commit is not completed.
+
+3. Client rollback plan after request to Compaction/Clustering Service.

Review Comment:
   The pull based mechanism works for less number of tables. Scanning 1000s of tables for possible services is going to induce lots of load of listing.
   
   The push based model is actually very elegant. Let me explain how I envision it:
   
   Assume a regular table which has some cleaning setting. In the current code, we schedule the clean and then execute the clean in the same Spark application. When hoodie.service.enable=true, the scheduling of clean will be as earlier but the execution part (HoodieWriteClient.clean(...) will detect that TableServices are enabled and instead of running clean, it will schedule it with the TableService by calling in the API endpoint. 
   
   Benefits:
   1. Easy Integration with HUDI current functionality
   2. No extra load to list datasets to find operations to run
   3. Confirms to all HUDI write config settings (e.g. when to clean, when to compact)
   4. In case of downtime of TableServices, one can simply disable hoodie.service.enable and revert to inline functionality.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org