You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/10/05 19:59:09 UTC

[GitHub] [hudi] xushiyan commented on a diff in pull request #6732: [HUDI-4148] Add client for hudi table management service

xushiyan commented on code in PR #6732:
URL: https://github.com/apache/hudi/pull/6732#discussion_r985359891


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -1052,13 +875,29 @@ public void dropIndex(List<MetadataPartitionType> partitionTypes) {
     }
   }
 
+  /**
+   * Performs Clustering for the workload stored in instant-time.
+   *
+   * @param clusteringInstantTime Clustering Instant Time
+   * @return Collection of WriteStatus to inspect errors and counts
+   */
+  public HoodieWriteMetadata<O> cluster(String clusteringInstantTime) {
+    if (delegateToTableManagerService(config, ActionType.replacecommit)) {
+      throw new HoodieException(ActionType.replacecommit.name() + " delegate to table management service!");

Review Comment:
   please align on the name



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseTableServiceClient.java:
##########
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.ActionType;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.TableServiceType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieCommitException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.metrics.HoodieMetrics;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
+import com.codahale.metrics.Timer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+
+public abstract class BaseTableServiceClient<O> extends CommonHoodieClient {

Review Comment:
   BaseHoodieTableServiceClient to align with BaseHoodieWriteClient



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/table/manager/HoodieTableManagerClient.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.table.manager;
+
+import org.apache.hudi.common.config.HoodieTableManagerConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieRemoteException;
+
+import org.apache.http.client.fluent.Request;
+import org.apache.http.client.fluent.Response;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Client which send the table service instants to the table management service.
+ */
+public class HoodieTableManagerClient {
+
+  private static final String BASE_URL = "/v1/hoodie/service";
+
+  public static final String REGISTER_ENDPOINT = String.format("%s/%s", BASE_URL, "register");
+
+  public static final String EXECUTE_COMPACTION = String.format("%s/%s", BASE_URL, "compact/execute");
+  public static final String DELETE_COMPACTION = String.format("%s/%s", BASE_URL, "compact/delete");
+
+  public static final String EXECUTE_CLUSTERING = String.format("%s/%s", BASE_URL, "cluster/execute");
+  public static final String DELETE_CLUSTERING = String.format("%s/%s", BASE_URL, "cluster/delete");
+
+  public static final String EXECUTE_CLEAN = String.format("%s/%s", BASE_URL, "clean/execute");
+  public static final String DELETE_CLEAN = String.format("%s/%s", BASE_URL, "clean/delete");
+
+  public static final String DATABASE_NAME_PARAM = "db_name";
+  public static final String TABLE_NAME_PARAM = "table_name";
+  public static final String BASEPATH_PARAM = "basepath";
+  public static final String INSTANT_PARAM = "instant";
+  public static final String USERNAME = "username";
+  public static final String CLUSTER = "cluster";
+  public static final String QUEUE = "queue";
+  public static final String RESOURCE = "resource";
+  public static final String PARALLELISM = "parallelism";
+  public static final String EXTRA_PARAMS = "extra_params";
+  public static final String EXECUTION_ENGINE = "execution_engine";
+
+  private final HoodieTableManagerConfig config;
+  private final HoodieTableMetaClient metaClient;
+  private final String host;
+  private final int port;
+  private final String basePath;
+  private final String dbName;
+  private final String tableName;
+
+  private static final Logger LOG = LogManager.getLogger(HoodieTableManagerClient.class);
+
+  public HoodieTableManagerClient(HoodieTableMetaClient metaClient, HoodieTableManagerConfig config) {
+    this.basePath = metaClient.getBasePathV2().toString();
+    this.dbName = metaClient.getTableConfig().getDatabaseName();
+    this.tableName = metaClient.getTableConfig().getTableName();
+    this.host = config.getTableManagerHost();
+    this.port = config.getTableManagerPort();
+    this.config = config;
+    this.metaClient = metaClient;
+  }
+
+  private String executeRequest(String requestPath, Map<String, String> queryParameters) throws IOException {
+    URIBuilder builder =
+        new URIBuilder().setHost(host).setPort(port).setPath(requestPath).setScheme("http");

Review Comment:
   shouldn't the server URI be configured? including scheme, host, port, etc. so we don't hard code http here



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/RunsTableService.java:
##########
@@ -34,4 +35,12 @@ default boolean tableServicesEnabled(HoodieWriteConfig config) {
     }
     return enabled;
   }
+
+  default boolean delegateToTableManagerService(HoodieWriteConfig config, ActionType actionType) {

Review Comment:
   useTableManagerService suits better for this



##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java:
##########
@@ -270,51 +254,8 @@ public void preWrite(String instantTime, WriteOperationType writeOperationType,
     // remove the async cleaning
   }
 
-  @Override
-  protected void writeTableMetadata(HoodieTable table, String instantTime, String actionType, HoodieCommitMetadata metadata) {

Review Comment:
   please annotate in comments properly to explain the code removals



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java:
##########
@@ -418,43 +311,13 @@ protected HoodieTable doInitTable(HoodieTableMetaClient metaClient, Option<Strin
       // Initialize Metadata Table to make sure it's bootstrapped _before_ the operation,
       // if it didn't exist before
       // See https://issues.apache.org/jira/browse/HUDI-3343 for more details
-      initializeMetadataTable(instantTime);
+      ((SparkRDDTableServiceClient) tableServiceClient).initializeMetadataTable(instantTime);
     }
 
     // Create a Hoodie table which encapsulated the commits and files visible
     return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, metaClient);
   }
 
-  /**
-   * Initialize the metadata table if needed. Creating the metadata table writer
-   * will trigger the initial bootstrapping from the data table.
-   *
-   * @param inFlightInstantTimestamp - The in-flight action responsible for the metadata table initialization
-   */
-  private void initializeMetadataTable(Option<String> inFlightInstantTimestamp) {
-    if (config.isMetadataTableEnabled()) {
-      SparkHoodieBackedTableMetadataWriter.create(context.getHadoopConf().get(), config,
-          context, Option.empty(), inFlightInstantTimestamp);
-    }
-  }
-
-  // TODO : To enforce priority between table service and ingestion writer, use transactions here and invoke strategy
-  private void completeTableService(TableServiceType tableServiceType, HoodieCommitMetadata metadata,

Review Comment:
   ditto



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -125,7 +114,7 @@
  * @param <K> Type of keys
  * @param <O> Type of outputs
  */
-public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K, O> extends BaseHoodieClient
+public abstract class BaseHoodieWriteClient<T extends HoodieRecordPayload, I, K, O> extends CommonHoodieClient

Review Comment:
   so the removed code is moved to the parent and TableServiceClient, right? for any removal from this class, please annotate the diff to clarify where it's moved to, it helps expedite review process.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/RunsTableService.java:
##########
@@ -34,4 +35,12 @@ default boolean tableServicesEnabled(HoodieWriteConfig config) {
     }
     return enabled;
   }
+
+  default boolean delegateToTableManagerService(HoodieWriteConfig config, ActionType actionType) {
+    boolean supportsAction = config.getTableManagerConfig().isTableManagerSupportsAction(actionType);
+    if (supportsAction) {
+      LOG.warn(actionType.name() + " delegate to table manager service!");
+    }
+    return supportsAction;

Review Comment:
   BaseTableServiceClient repeats this ? can we adopt the interface there?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -1052,13 +875,29 @@ public void dropIndex(List<MetadataPartitionType> partitionTypes) {
     }
   }
 
+  /**
+   * Performs Clustering for the workload stored in instant-time.
+   *
+   * @param clusteringInstantTime Clustering Instant Time
+   * @return Collection of WriteStatus to inspect errors and counts
+   */
+  public HoodieWriteMetadata<O> cluster(String clusteringInstantTime) {
+    if (delegateToTableManagerService(config, ActionType.replacecommit)) {

Review Comment:
   so we call it Table Management Service or Table Manager Service? please align this with RFC doc. Or do we simply call it Table Manager?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/table/manager/HoodieTableManagerClient.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.table.manager;
+
+import org.apache.hudi.common.config.HoodieTableManagerConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieRemoteException;
+
+import org.apache.http.client.fluent.Request;
+import org.apache.http.client.fluent.Response;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Client which send the table service instants to the table management service.
+ */
+public class HoodieTableManagerClient {
+
+  private static final String BASE_URL = "/v1/hoodie/service";
+
+  public static final String REGISTER_ENDPOINT = String.format("%s/%s", BASE_URL, "register");
+
+  public static final String EXECUTE_COMPACTION = String.format("%s/%s", BASE_URL, "compact/execute");
+  public static final String DELETE_COMPACTION = String.format("%s/%s", BASE_URL, "compact/delete");
+
+  public static final String EXECUTE_CLUSTERING = String.format("%s/%s", BASE_URL, "cluster/execute");
+  public static final String DELETE_CLUSTERING = String.format("%s/%s", BASE_URL, "cluster/delete");
+
+  public static final String EXECUTE_CLEAN = String.format("%s/%s", BASE_URL, "clean/execute");
+  public static final String DELETE_CLEAN = String.format("%s/%s", BASE_URL, "clean/delete");
+
+  public static final String DATABASE_NAME_PARAM = "db_name";
+  public static final String TABLE_NAME_PARAM = "table_name";
+  public static final String BASEPATH_PARAM = "basepath";
+  public static final String INSTANT_PARAM = "instant";
+  public static final String USERNAME = "username";
+  public static final String CLUSTER = "cluster";
+  public static final String QUEUE = "queue";
+  public static final String RESOURCE = "resource";
+  public static final String PARALLELISM = "parallelism";
+  public static final String EXTRA_PARAMS = "extra_params";
+  public static final String EXECUTION_ENGINE = "execution_engine";

Review Comment:
   use an ENUM to manage these text keys? or have a request composer to template and create the request payload?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseTableServiceClient.java:
##########
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.ActionType;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.TableServiceType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieCommitException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.metrics.HoodieMetrics;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
+import com.codahale.metrics.Timer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+
+public abstract class BaseTableServiceClient<O> extends CommonHoodieClient {
+
+  private static final Logger LOG = LogManager.getLogger(BaseHoodieWriteClient.class);
+
+  protected BaseTableServiceClient(HoodieEngineContext context, HoodieWriteConfig clientConfig, HoodieMetrics metrics) {
+    super(context, clientConfig, Option.empty());
+  }
+
+  protected boolean tableServicesEnabled(HoodieWriteConfig config) {
+    boolean enabled = config.areTableServicesEnabled();
+    if (!enabled) {
+      LOG.warn(String.format("Table services are disabled. Set `%s` to enable.", HoodieWriteConfig.TABLE_SERVICES_ENABLED));
+    }
+    return enabled;
+  }
+
+  protected boolean delegateToTableManagerService(HoodieWriteConfig config, ActionType actionType) {

Review Comment:
   useTableManagerService() ? delegateToXXX does not imply boolean return



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseTableServiceClient.java:
##########
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.avro.model.HoodieCleanMetadata;
+import org.apache.hudi.avro.model.HoodieCleanerPlan;
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.avro.model.HoodieCompactionPlan;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.ActionType;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.TableServiceType;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CleanerUtils;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieClusteringConfig;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieCommitException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.metrics.HoodieMetrics;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+
+import com.codahale.metrics.Timer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+
+public abstract class BaseTableServiceClient<O> extends CommonHoodieClient {
+
+  private static final Logger LOG = LogManager.getLogger(BaseHoodieWriteClient.class);
+
+  protected BaseTableServiceClient(HoodieEngineContext context, HoodieWriteConfig clientConfig, HoodieMetrics metrics) {
+    super(context, clientConfig, Option.empty());
+  }
+
+  protected boolean tableServicesEnabled(HoodieWriteConfig config) {
+    boolean enabled = config.areTableServicesEnabled();
+    if (!enabled) {
+      LOG.warn(String.format("Table services are disabled. Set `%s` to enable.", HoodieWriteConfig.TABLE_SERVICES_ENABLED));
+    }
+    return enabled;
+  }
+
+  protected boolean delegateToTableManagerService(HoodieWriteConfig config, ActionType actionType) {
+    boolean supportsAction = config.getTableManagerConfig().isTableManagerSupportsAction(actionType);
+    if (supportsAction) {
+      LOG.warn(actionType.name() + " delegate to table manager service!");

Review Comment:
   the log statement looks redundant here. If really needs, make it debug



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -2160,6 +2167,13 @@ public boolean isMetastoreEnabled() {
     return metastoreConfig.enableMetastore();
   }
 
+  /**
+   * Table Manager configs.
+   */
+  public boolean isTableManagerEnabled() {
+    return tableManagerConfig.enableTableManager();

Review Comment:
   again let's make sure the name is finalized: Table Management Service, or Table Manager Service, or Table Manager?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/table/manager/HoodieTableManagerClient.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.table.manager;
+
+import org.apache.hudi.common.config.HoodieTableManagerConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieRemoteException;
+
+import org.apache.http.client.fluent.Request;
+import org.apache.http.client.fluent.Response;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Client which send the table service instants to the table management service.
+ */
+public class HoodieTableManagerClient {
+
+  private static final String BASE_URL = "/v1/hoodie/service";
+
+  public static final String REGISTER_ENDPOINT = String.format("%s/%s", BASE_URL, "register");
+
+  public static final String EXECUTE_COMPACTION = String.format("%s/%s", BASE_URL, "compact/execute");
+  public static final String DELETE_COMPACTION = String.format("%s/%s", BASE_URL, "compact/delete");
+
+  public static final String EXECUTE_CLUSTERING = String.format("%s/%s", BASE_URL, "cluster/execute");
+  public static final String DELETE_CLUSTERING = String.format("%s/%s", BASE_URL, "cluster/delete");
+
+  public static final String EXECUTE_CLEAN = String.format("%s/%s", BASE_URL, "clean/execute");
+  public static final String DELETE_CLEAN = String.format("%s/%s", BASE_URL, "clean/delete");
+
+  public static final String DATABASE_NAME_PARAM = "db_name";
+  public static final String TABLE_NAME_PARAM = "table_name";
+  public static final String BASEPATH_PARAM = "basepath";
+  public static final String INSTANT_PARAM = "instant";
+  public static final String USERNAME = "username";
+  public static final String CLUSTER = "cluster";
+  public static final String QUEUE = "queue";
+  public static final String RESOURCE = "resource";
+  public static final String PARALLELISM = "parallelism";
+  public static final String EXTRA_PARAMS = "extra_params";
+  public static final String EXECUTION_ENGINE = "execution_engine";
+
+  private final HoodieTableManagerConfig config;
+  private final HoodieTableMetaClient metaClient;
+  private final String host;
+  private final int port;
+  private final String basePath;
+  private final String dbName;
+  private final String tableName;
+
+  private static final Logger LOG = LogManager.getLogger(HoodieTableManagerClient.class);
+
+  public HoodieTableManagerClient(HoodieTableMetaClient metaClient, HoodieTableManagerConfig config) {
+    this.basePath = metaClient.getBasePathV2().toString();
+    this.dbName = metaClient.getTableConfig().getDatabaseName();
+    this.tableName = metaClient.getTableConfig().getTableName();
+    this.host = config.getTableManagerHost();
+    this.port = config.getTableManagerPort();
+    this.config = config;
+    this.metaClient = metaClient;
+  }
+
+  private String executeRequest(String requestPath, Map<String, String> queryParameters) throws IOException {
+    URIBuilder builder =
+        new URIBuilder().setHost(host).setPort(port).setPath(requestPath).setScheme("http");
+    queryParameters.forEach(builder::addParameter);
+
+    String url = builder.toString();
+    LOG.info("Sending request to table management service : (" + url + ")");
+    Response response;
+    int timeout = this.config.getConnectionTimeout() * 1000; // msec
+    int requestRetryLimit = config.getConnectionRetryLimit();
+    int retry = 0;
+
+    while (retry < requestRetryLimit) {
+      try {
+        response = Request.Get(url).connectTimeout(timeout).socketTimeout(timeout).execute();
+        return response.returnContent().asString();
+      } catch (IOException e) {
+        retry++;
+        LOG.warn(String.format("Failed request to server %s, will retry for %d times", url, requestRetryLimit - retry), e);
+        if (requestRetryLimit == retry) {
+          throw e;
+        }
+      }
+
+      try {
+        TimeUnit.SECONDS.sleep(config.getConnectionRetryDelay());
+      } catch (InterruptedException e) {
+        // ignore
+      }
+    }
+
+    throw new IOException(String.format("Failed request to table management service %s after retry %d times", url, requestRetryLimit));
+  }
+
+  private Map<String, String> getParamsWithAdditionalParams(String[] paramNames, String[] paramVals) {

Review Comment:
   why is it called "AdditionalParams" ? the method creates a new map and put everything inside. it's just putting key vals together. The name makes it sounds like associating with the EXTRA_PARAMS. let's name it properly.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/table/manager/HoodieTableManagerClient.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.table.manager;
+
+import org.apache.hudi.common.config.HoodieTableManagerConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieRemoteException;
+
+import org.apache.http.client.fluent.Request;
+import org.apache.http.client.fluent.Response;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Client which send the table service instants to the table management service.
+ */
+public class HoodieTableManagerClient {
+
+  private static final String BASE_URL = "/v1/hoodie/service";
+
+  public static final String REGISTER_ENDPOINT = String.format("%s/%s", BASE_URL, "register");
+
+  public static final String EXECUTE_COMPACTION = String.format("%s/%s", BASE_URL, "compact/execute");
+  public static final String DELETE_COMPACTION = String.format("%s/%s", BASE_URL, "compact/delete");
+
+  public static final String EXECUTE_CLUSTERING = String.format("%s/%s", BASE_URL, "cluster/execute");
+  public static final String DELETE_CLUSTERING = String.format("%s/%s", BASE_URL, "cluster/delete");
+
+  public static final String EXECUTE_CLEAN = String.format("%s/%s", BASE_URL, "clean/execute");
+  public static final String DELETE_CLEAN = String.format("%s/%s", BASE_URL, "clean/delete");
+
+  public static final String DATABASE_NAME_PARAM = "db_name";
+  public static final String TABLE_NAME_PARAM = "table_name";
+  public static final String BASEPATH_PARAM = "basepath";
+  public static final String INSTANT_PARAM = "instant";
+  public static final String USERNAME = "username";
+  public static final String CLUSTER = "cluster";
+  public static final String QUEUE = "queue";
+  public static final String RESOURCE = "resource";
+  public static final String PARALLELISM = "parallelism";
+  public static final String EXTRA_PARAMS = "extra_params";
+  public static final String EXECUTION_ENGINE = "execution_engine";
+
+  private final HoodieTableManagerConfig config;
+  private final HoodieTableMetaClient metaClient;
+  private final String host;
+  private final int port;
+  private final String basePath;
+  private final String dbName;
+  private final String tableName;
+
+  private static final Logger LOG = LogManager.getLogger(HoodieTableManagerClient.class);
+
+  public HoodieTableManagerClient(HoodieTableMetaClient metaClient, HoodieTableManagerConfig config) {
+    this.basePath = metaClient.getBasePathV2().toString();
+    this.dbName = metaClient.getTableConfig().getDatabaseName();
+    this.tableName = metaClient.getTableConfig().getTableName();
+    this.host = config.getTableManagerHost();
+    this.port = config.getTableManagerPort();
+    this.config = config;
+    this.metaClient = metaClient;
+  }
+
+  private String executeRequest(String requestPath, Map<String, String> queryParameters) throws IOException {
+    URIBuilder builder =
+        new URIBuilder().setHost(host).setPort(port).setPath(requestPath).setScheme("http");
+    queryParameters.forEach(builder::addParameter);
+
+    String url = builder.toString();
+    LOG.info("Sending request to table management service : (" + url + ")");
+    Response response;
+    int timeout = this.config.getConnectionTimeout() * 1000; // msec
+    int requestRetryLimit = config.getConnectionRetryLimit();
+    int retry = 0;
+
+    while (retry < requestRetryLimit) {

Review Comment:
   can we make use org.apache.hudi.common.util.RetryHelper here? also pls enhance the helper wherever applicable



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/table/manager/HoodieTableManagerClient.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.table.manager;
+
+import org.apache.hudi.common.config.HoodieTableManagerConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieRemoteException;
+
+import org.apache.http.client.fluent.Request;
+import org.apache.http.client.fluent.Response;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Client which send the table service instants to the table management service.
+ */
+public class HoodieTableManagerClient {
+
+  private static final String BASE_URL = "/v1/hoodie/service";
+
+  public static final String REGISTER_ENDPOINT = String.format("%s/%s", BASE_URL, "register");
+
+  public static final String EXECUTE_COMPACTION = String.format("%s/%s", BASE_URL, "compact/execute");
+  public static final String DELETE_COMPACTION = String.format("%s/%s", BASE_URL, "compact/delete");

Review Comment:
   "execute" sounds like a synchronous process, while we are actually requesting (async) right? it's more accurate with "request". Same problem with "delete". shouldn't it be "cancel"?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java:
##########
@@ -159,9 +162,19 @@ protected Option<HoodieCleanerPlan> requestClean(String startCleanTime) {
         LOG.error("Got exception when saving cleaner requested file", e);
         throw new HoodieIOException(e.getMessage(), e);
       }
-      return Option.of(cleanerPlan);
+      option = Option.of(cleanerPlan);
     }
-    return Option.empty();
+
+    if (config.getTableManagerConfig().isTableManagerSupportsAction(ActionType.clean)) {
+      delegateCleanExecutionToTableManager();
+    }

Review Comment:
   this reads like "as long as table manager supports this action, we delegate it". but internally it checks enabling or not. we should avoid this confusion.
   
   We should also make the logic flow cleanly separated: can you make the delegation return Option<HoodieCleanerPlan> and fully separated from the existing flow? if to delegate, then let the delegate do everything. This applies to other executor changes.



##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieTableManagerConfig.java:
##########
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.config;
+
+import org.apache.hudi.common.model.ActionType;
+
+import javax.annotation.concurrent.Immutable;
+
+import java.util.Properties;
+
+/**
+ * Configurations used by the HUDI Table Management Service.
+ */
+@Immutable
+@ConfigClassProperty(name = "Table Management Service Configs",
+    groupName = ConfigGroups.Names.WRITE_CLIENT,
+    description = "Configurations used by the Hudi Table Management Service.")
+public class HoodieTableManagerConfig extends HoodieConfig {
+
+  public static final String TABLE_MANAGEMENT_SERVICE_PREFIX = "hoodie.table.management.service";

Review Comment:
   name alignment, applies to all configs below



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/table/manager/HoodieTableManagerClient.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.table.manager;
+
+import org.apache.hudi.common.config.HoodieTableManagerConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieRemoteException;
+
+import org.apache.http.client.fluent.Request;
+import org.apache.http.client.fluent.Response;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Client which send the table service instants to the table management service.
+ */
+public class HoodieTableManagerClient {
+
+  private static final String BASE_URL = "/v1/hoodie/service";
+
+  public static final String REGISTER_ENDPOINT = String.format("%s/%s", BASE_URL, "register");
+
+  public static final String EXECUTE_COMPACTION = String.format("%s/%s", BASE_URL, "compact/execute");
+  public static final String DELETE_COMPACTION = String.format("%s/%s", BASE_URL, "compact/delete");
+
+  public static final String EXECUTE_CLUSTERING = String.format("%s/%s", BASE_URL, "cluster/execute");
+  public static final String DELETE_CLUSTERING = String.format("%s/%s", BASE_URL, "cluster/delete");
+
+  public static final String EXECUTE_CLEAN = String.format("%s/%s", BASE_URL, "clean/execute");
+  public static final String DELETE_CLEAN = String.format("%s/%s", BASE_URL, "clean/delete");

Review Comment:
   the endpoint url pattern looks more like RPC rather than REST. if the design decides to go with REST, then i think the action itself like execute/delete should be part of the payload. If go with RPC, this is fine.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org