You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/06/28 20:58:44 UTC

[GitHub] [hudi] vinothchandar commented on a diff in pull request #5681: [HUDI-4148] Preparations and client for hudi table manager service

vinothchandar commented on code in PR #5681:
URL: https://github.com/apache/hudi/pull/5681#discussion_r908895076


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/table/manager/HoodieTableManagerClient.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.table.manager;
+
+import org.apache.hudi.common.config.HoodieTableManagerConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.http.client.fluent.Request;
+import org.apache.http.client.fluent.Response;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class HoodieTableManagerClient {
+
+  private static final String BASE_URL = "/v1/hoodie/serivce";

Review Comment:
   /v1/hudi?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -1903,6 +1906,10 @@ public HoodieMetadataConfig getMetadataConfig() {
     return metadataConfig;
   }
 
+  public HoodieTableManagerConfig getTableManagerConfig() {

Review Comment:
   Should we have this as a separate module? That is build on top of `hudi-client-common`



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java:
##########
@@ -152,9 +156,26 @@ protected Option<HoodieCleanerPlan> requestClean(String startCleanTime) {
         LOG.error("Got exception when saving cleaner requested file", e);
         throw new HoodieIOException(e.getMessage(), e);
       }
-      return Option.of(cleanerPlan);
+      option = Option.of(cleanerPlan);
     }
-    return Option.empty();
+
+    if (config.isTableManagerEnabled() && config.getTableManagerConfig().getTableManagerActions().contains(ActionType.clean.name())) {
+      submitCleanToService();
+    }
+
+    return option;
+  }
+
+  private void submitCleanToService() {
+    HoodieTableMetaClient metaClient = table.getMetaClient();
+    List<String> instantsToSubmit = metaClient.getActiveTimeline()

Review Comment:
   Hmmm. based on this, do you intend to have the scheduling done by the writer still and only move execution to the Table manager service?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java:
##########
@@ -236,6 +237,11 @@ private HoodieCleanMetadata runClean(HoodieTable<T, I, K, O> table, HoodieInstan
 
   @Override
   public HoodieCleanMetadata execute() {
+    if (config.isTableManagerEnabled() && config.getTableManagerConfig().getTableManagerActions().contains(ActionType.clean.name())) {
+      LOG.warn("Compaction delegate to table management service, do not clean for client!");

Review Comment:
   this is cleaning? Can we check all log messages like this



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/table/manager/HoodieTableManagerClient.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.table.manager;
+
+import org.apache.hudi.common.config.HoodieTableManagerConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.http.client.fluent.Request;
+import org.apache.http.client.fluent.Response;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class HoodieTableManagerClient {
+
+  private static final String BASE_URL = "/v1/hoodie/serivce";
+
+  public static final String REGISTER = String.format("%s/%s", BASE_URL, "register");
+
+  public static final String SUBMIT_COMPACTION = String.format("%s/%s", BASE_URL, "compact/submit");
+  public static final String REMOVE_COMPACTION = String.format("%s/%s", BASE_URL, "compact/remove");
+
+  public static final String SUBMIT_CLUSTERING = String.format("%s/%s", BASE_URL, "cluster/submit");
+  public static final String REMOVE_CLUSTERING = String.format("%s/%s", BASE_URL, "cluster/remove");
+
+  public static final String SUBMIT_CLEAN = String.format("%s/%s", BASE_URL, "clean/submit");
+  public static final String REMOVE_CLEAN = String.format("%s/%s", BASE_URL, "clean/remove");
+
+  public static final String DATABASE_NAME_PARAM = "db_name";
+  public static final String TABLE_NAME_PARAM = "table_name";
+  public static final String BASEPATH_PARAM = "basepath";
+  public static final String INSTANT_PARAM = "instant";
+  public static final String OWNER = "owner";
+  public static final String CLUSTER = "cluster";
+  public static final String QUEUE = "queue";
+  public static final String RESOURCE = "resource";
+  public static final String PARALLELISM = "parallelism";
+  public static final String EXTRA_PARAMS = "extra_params";
+  public static final String EXECUTION_ENGINE = "execution_engine";
+
+  private final HoodieTableManagerConfig config;
+  private final String host;
+  private final int port;
+  private final String basePath;
+  private final String dbName;
+  private final String tableName;
+
+  private static int failTimes;
+
+  private static final Logger LOG = LogManager.getLogger(HoodieTableManagerClient.class);
+
+  public HoodieTableManagerClient(HoodieTableMetaClient metaClient, HoodieTableManagerConfig config) {
+    this.basePath = metaClient.getBasePathV2().toString();
+    this.dbName = metaClient.getTableConfig().getDatabaseName();
+    this.tableName = metaClient.getTableConfig().getTableName();
+    this.host = config.getTableManagerHost();
+    this.port = config.getTableManagerPort();
+    this.config = config;
+  }
+
+  private String executeRequest(String requestPath, String instantRange) throws IOException {
+    URIBuilder builder =
+        new URIBuilder().setHost(host).setPort(port).setPath(requestPath).setScheme("http");
+
+    Map<String, String> queryParameters = getParamsWithAdditionalParams(
+        new String[] {DATABASE_NAME_PARAM, TABLE_NAME_PARAM, INSTANT_PARAM, OWNER, QUEUE, RESOURCE, PARALLELISM, EXTRA_PARAMS},
+        new String[] {dbName, tableName, instantRange, config.getDeployUsername(), config.getDeployQueue(),
+            config.getDeployResource(), String.valueOf(config.getDeployParallelism()), config.getDeployExtraParams()});
+    queryParameters.forEach(builder::addParameter);
+
+    String url = builder.toString();
+    LOG.info("Sending request to table management service : (" + url + ")");
+    Response response;
+    int timeout = this.config.getConnectionTimeout() * 1000; // msec
+    int requestRetryLimit = config.getConnectionRetryLimit();
+    int retry = 0;
+
+    while (retry < requestRetryLimit) {
+      try {
+        response = Request.Get(url).connectTimeout(timeout).socketTimeout(timeout).execute();
+        return response.returnContent().asString();
+      } catch (IOException e) {
+        retry++;
+        LOG.warn(String.format("Failed request to server %s, will retry for %d times", url, requestRetryLimit - retry), e);
+        if (requestRetryLimit == retry) {
+          throw e;
+        }
+      }
+
+      try {
+        TimeUnit.SECONDS.sleep(config.getConnectionRetryDelay());
+      } catch (InterruptedException e) {
+        // ignore
+      }
+    }
+
+    throw new IOException(String.format("Failed request to table management service %s after retry %d times", url, requestRetryLimit));
+  }
+
+  private Map<String, String> getParamsWithAdditionalParams(String[] paramNames, String[] paramVals) {
+    Map<String, String> paramsMap = new HashMap<>();
+    paramsMap.put(BASEPATH_PARAM, basePath);
+    ValidationUtils.checkArgument(paramNames.length == paramVals.length);
+    for (int i = 0; i < paramNames.length; i++) {
+      paramsMap.put(paramNames[i], paramVals[i]);
+    }
+    return paramsMap;
+  }
+
+  public void submitCompaction(List<String> instantsToSubmit) {
+    try {
+      executeRequest(SUBMIT_COMPACTION, StringUtils.join(instantsToSubmit.toArray(new String[0]), ","));
+    } catch (Exception e) {
+      LOG.error("Failed to schedule compaction to service", e);
+      failTimes++;
+      checkTolerableSchedule(e);
+      return;
+    }
+
+    failTimes = 0;
+  }
+
+  public void submitClean(List<String> instantsToSubmit) {
+    try {
+      executeRequest(SUBMIT_CLEAN, StringUtils.join(instantsToSubmit.toArray(new String[0]), ","));
+    } catch (Exception e) {
+      LOG.error("Failed to schedule clean to service", e);

Review Comment:
   can we reuse all this code around exception handling? there is 5 lines of code across the submitXXX methods



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/cluster/SparkExecuteClusteringCommitActionExecutor.java:
##########
@@ -50,6 +55,13 @@ public SparkExecuteClusteringCommitActionExecutor(HoodieEngineContext context,
 
   @Override
   public HoodieWriteMetadata<HoodieData<WriteStatus>> execute() {
+    if (config.isTableManagerEnabled() && config.getTableManagerConfig().getTableManagerActions().contains(ActionType.replacecommit.name())) {
+      LOG.warn("Clustering delegate to table management service, do not cluster for client!");

Review Comment:
   Can we avoid invoking this code path from the client level, rather than returning an empty writeMetadata object



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/table/manager/HoodieTableManagerClient.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.table.manager;
+
+import org.apache.hudi.common.config.HoodieTableManagerConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.http.client.fluent.Request;
+import org.apache.http.client.fluent.Response;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class HoodieTableManagerClient {
+
+  private static final String BASE_URL = "/v1/hoodie/serivce";
+
+  public static final String REGISTER = String.format("%s/%s", BASE_URL, "register");
+
+  public static final String SUBMIT_COMPACTION = String.format("%s/%s", BASE_URL, "compact/submit");
+  public static final String REMOVE_COMPACTION = String.format("%s/%s", BASE_URL, "compact/remove");
+
+  public static final String SUBMIT_CLUSTERING = String.format("%s/%s", BASE_URL, "cluster/submit");
+  public static final String REMOVE_CLUSTERING = String.format("%s/%s", BASE_URL, "cluster/remove");
+
+  public static final String SUBMIT_CLEAN = String.format("%s/%s", BASE_URL, "clean/submit");
+  public static final String REMOVE_CLEAN = String.format("%s/%s", BASE_URL, "clean/remove");
+
+  public static final String DATABASE_NAME_PARAM = "db_name";
+  public static final String TABLE_NAME_PARAM = "table_name";
+  public static final String BASEPATH_PARAM = "basepath";
+  public static final String INSTANT_PARAM = "instant";
+  public static final String OWNER = "owner";
+  public static final String CLUSTER = "cluster";
+  public static final String QUEUE = "queue";
+  public static final String RESOURCE = "resource";
+  public static final String PARALLELISM = "parallelism";
+  public static final String EXTRA_PARAMS = "extra_params";
+  public static final String EXECUTION_ENGINE = "execution_engine";
+
+  private final HoodieTableManagerConfig config;
+  private final String host;
+  private final int port;
+  private final String basePath;
+  private final String dbName;
+  private final String tableName;
+
+  private static int failTimes;

Review Comment:
   did you intend this as static? Seems like all usage is non-static. also its private



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/table/manager/HoodieTableManagerClient.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.table.manager;
+
+import org.apache.hudi.common.config.HoodieTableManagerConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.http.client.fluent.Request;
+import org.apache.http.client.fluent.Response;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class HoodieTableManagerClient {
+
+  private static final String BASE_URL = "/v1/hoodie/serivce";
+
+  public static final String REGISTER = String.format("%s/%s", BASE_URL, "register");
+
+  public static final String SUBMIT_COMPACTION = String.format("%s/%s", BASE_URL, "compact/submit");
+  public static final String REMOVE_COMPACTION = String.format("%s/%s", BASE_URL, "compact/remove");
+
+  public static final String SUBMIT_CLUSTERING = String.format("%s/%s", BASE_URL, "cluster/submit");
+  public static final String REMOVE_CLUSTERING = String.format("%s/%s", BASE_URL, "cluster/remove");
+
+  public static final String SUBMIT_CLEAN = String.format("%s/%s", BASE_URL, "clean/submit");
+  public static final String REMOVE_CLEAN = String.format("%s/%s", BASE_URL, "clean/remove");
+
+  public static final String DATABASE_NAME_PARAM = "db_name";
+  public static final String TABLE_NAME_PARAM = "table_name";
+  public static final String BASEPATH_PARAM = "basepath";
+  public static final String INSTANT_PARAM = "instant";
+  public static final String OWNER = "owner";
+  public static final String CLUSTER = "cluster";
+  public static final String QUEUE = "queue";
+  public static final String RESOURCE = "resource";
+  public static final String PARALLELISM = "parallelism";
+  public static final String EXTRA_PARAMS = "extra_params";
+  public static final String EXECUTION_ENGINE = "execution_engine";
+
+  private final HoodieTableManagerConfig config;
+  private final String host;
+  private final int port;
+  private final String basePath;
+  private final String dbName;
+  private final String tableName;
+
+  private static int failTimes;
+
+  private static final Logger LOG = LogManager.getLogger(HoodieTableManagerClient.class);
+
+  public HoodieTableManagerClient(HoodieTableMetaClient metaClient, HoodieTableManagerConfig config) {
+    this.basePath = metaClient.getBasePathV2().toString();
+    this.dbName = metaClient.getTableConfig().getDatabaseName();
+    this.tableName = metaClient.getTableConfig().getTableName();
+    this.host = config.getTableManagerHost();
+    this.port = config.getTableManagerPort();
+    this.config = config;
+  }
+
+  private String executeRequest(String requestPath, String instantRange) throws IOException {
+    URIBuilder builder =
+        new URIBuilder().setHost(host).setPort(port).setPath(requestPath).setScheme("http");
+
+    Map<String, String> queryParameters = getParamsWithAdditionalParams(
+        new String[] {DATABASE_NAME_PARAM, TABLE_NAME_PARAM, INSTANT_PARAM, OWNER, QUEUE, RESOURCE, PARALLELISM, EXTRA_PARAMS},
+        new String[] {dbName, tableName, instantRange, config.getDeployUsername(), config.getDeployQueue(),
+            config.getDeployResource(), String.valueOf(config.getDeployParallelism()), config.getDeployExtraParams()});
+    queryParameters.forEach(builder::addParameter);
+
+    String url = builder.toString();
+    LOG.info("Sending request to table management service : (" + url + ")");
+    Response response;
+    int timeout = this.config.getConnectionTimeout() * 1000; // msec
+    int requestRetryLimit = config.getConnectionRetryLimit();
+    int retry = 0;
+
+    while (retry < requestRetryLimit) {
+      try {
+        response = Request.Get(url).connectTimeout(timeout).socketTimeout(timeout).execute();
+        return response.returnContent().asString();
+      } catch (IOException e) {
+        retry++;
+        LOG.warn(String.format("Failed request to server %s, will retry for %d times", url, requestRetryLimit - retry), e);
+        if (requestRetryLimit == retry) {
+          throw e;
+        }
+      }
+
+      try {
+        TimeUnit.SECONDS.sleep(config.getConnectionRetryDelay());
+      } catch (InterruptedException e) {
+        // ignore
+      }
+    }
+
+    throw new IOException(String.format("Failed request to table management service %s after retry %d times", url, requestRetryLimit));
+  }
+
+  private Map<String, String> getParamsWithAdditionalParams(String[] paramNames, String[] paramVals) {
+    Map<String, String> paramsMap = new HashMap<>();
+    paramsMap.put(BASEPATH_PARAM, basePath);
+    ValidationUtils.checkArgument(paramNames.length == paramVals.length);
+    for (int i = 0; i < paramNames.length; i++) {
+      paramsMap.put(paramNames[i], paramVals[i]);
+    }
+    return paramsMap;
+  }
+
+  public void submitCompaction(List<String> instantsToSubmit) {
+    try {
+      executeRequest(SUBMIT_COMPACTION, StringUtils.join(instantsToSubmit.toArray(new String[0]), ","));
+    } catch (Exception e) {
+      LOG.error("Failed to schedule compaction to service", e);
+      failTimes++;
+      checkTolerableSchedule(e);
+      return;
+    }
+
+    failTimes = 0;
+  }
+
+  public void submitClean(List<String> instantsToSubmit) {
+    try {
+      executeRequest(SUBMIT_CLEAN, StringUtils.join(instantsToSubmit.toArray(new String[0]), ","));
+    } catch (Exception e) {
+      LOG.error("Failed to schedule clean to service", e);
+      failTimes++;
+      checkTolerableSchedule(e);
+      return;
+    }
+
+    failTimes = 0;
+  }
+
+  public void submitClustering(List<String> instantsToSubmit) {
+    try {
+      executeRequest(SUBMIT_CLUSTERING, StringUtils.join(instantsToSubmit.toArray(new String[0]), ","));
+    } catch (Exception e) {
+      LOG.error("Failed to schedule clustering to service", e);
+      failTimes++;
+      checkTolerableSchedule(e);
+      return;
+    }
+
+    failTimes = 0;
+  }
+
+  private void checkTolerableSchedule(Exception e) {

Review Comment:
   overalll I wonder if there is a simpler way of implementing retryable requests



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/table/manager/HoodieTableManagerClient.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.table.manager;
+
+import org.apache.hudi.common.config.HoodieTableManagerConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.http.client.fluent.Request;
+import org.apache.http.client.fluent.Response;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class HoodieTableManagerClient {
+
+  private static final String BASE_URL = "/v1/hoodie/serivce";
+
+  public static final String REGISTER = String.format("%s/%s", BASE_URL, "register");

Review Comment:
   Add a `_ENDPOINT` to the names, so its clear what this is? 
   
   `REGISTER_ENDPOINT` 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java:
##########
@@ -152,9 +156,26 @@ protected Option<HoodieCleanerPlan> requestClean(String startCleanTime) {
         LOG.error("Got exception when saving cleaner requested file", e);
         throw new HoodieIOException(e.getMessage(), e);
       }
-      return Option.of(cleanerPlan);
+      option = Option.of(cleanerPlan);
     }
-    return Option.empty();
+
+    if (config.isTableManagerEnabled() && config.getTableManagerConfig().getTableManagerActions().contains(ActionType.clean.name())) {

Review Comment:
   We can then instruct the users to set `hoodie.enable.table.services=false` when running with Table management services. Another point here is that we should check for some lock manager being configured when operating in this mode, so that there is no race between delta commits and compaction scheduling.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanActionExecutor.java:
##########
@@ -152,9 +156,26 @@ protected Option<HoodieCleanerPlan> requestClean(String startCleanTime) {
         LOG.error("Got exception when saving cleaner requested file", e);
         throw new HoodieIOException(e.getMessage(), e);
       }
-      return Option.of(cleanerPlan);
+      option = Option.of(cleanerPlan);
     }
-    return Option.empty();
+
+    if (config.isTableManagerEnabled() && config.getTableManagerConfig().getTableManagerActions().contains(ActionType.clean.name())) {

Review Comment:
   Instead of leaking the table management service into the action executors., can we layer it on top? Introduce a higher level write config say `hoodie.enable.table.services`, (default: `true`) which when set to `false` will not do any scheduling/execution of any table services. This check can be added at the BaseHoodieWriteClient level when the calls are made for clean, scheduleClean, compact, scheduleCompact etc.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/table/manager/HoodieTableManagerClient.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.table.manager;
+
+import org.apache.hudi.common.config.HoodieTableManagerConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.http.client.fluent.Request;
+import org.apache.http.client.fluent.Response;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class HoodieTableManagerClient {
+
+  private static final String BASE_URL = "/v1/hoodie/serivce";
+
+  public static final String REGISTER = String.format("%s/%s", BASE_URL, "register");
+
+  public static final String SUBMIT_COMPACTION = String.format("%s/%s", BASE_URL, "compact/submit");
+  public static final String REMOVE_COMPACTION = String.format("%s/%s", BASE_URL, "compact/remove");
+
+  public static final String SUBMIT_CLUSTERING = String.format("%s/%s", BASE_URL, "cluster/submit");
+  public static final String REMOVE_CLUSTERING = String.format("%s/%s", BASE_URL, "cluster/remove");
+
+  public static final String SUBMIT_CLEAN = String.format("%s/%s", BASE_URL, "clean/submit");
+  public static final String REMOVE_CLEAN = String.format("%s/%s", BASE_URL, "clean/remove");
+
+  public static final String DATABASE_NAME_PARAM = "db_name";
+  public static final String TABLE_NAME_PARAM = "table_name";
+  public static final String BASEPATH_PARAM = "basepath";
+  public static final String INSTANT_PARAM = "instant";
+  public static final String OWNER = "owner";
+  public static final String CLUSTER = "cluster";
+  public static final String QUEUE = "queue";
+  public static final String RESOURCE = "resource";
+  public static final String PARALLELISM = "parallelism";
+  public static final String EXTRA_PARAMS = "extra_params";
+  public static final String EXECUTION_ENGINE = "execution_engine";
+
+  private final HoodieTableManagerConfig config;
+  private final String host;
+  private final int port;
+  private final String basePath;
+  private final String dbName;
+  private final String tableName;
+
+  private static int failTimes;

Review Comment:
   and this is assuming that there is one request in flight, since all submitXXX methods update this variable. Can we either make this local to submitXXX method or make all those methods synchronized, so that any multi-threaded usage is not suprising



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/table/manager/HoodieTableManagerClient.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.table.manager;
+
+import org.apache.hudi.common.config.HoodieTableManagerConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.http.client.fluent.Request;
+import org.apache.http.client.fluent.Response;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class HoodieTableManagerClient {
+
+  private static final String BASE_URL = "/v1/hoodie/serivce";
+
+  public static final String REGISTER = String.format("%s/%s", BASE_URL, "register");
+
+  public static final String SUBMIT_COMPACTION = String.format("%s/%s", BASE_URL, "compact/submit");
+  public static final String REMOVE_COMPACTION = String.format("%s/%s", BASE_URL, "compact/remove");
+
+  public static final String SUBMIT_CLUSTERING = String.format("%s/%s", BASE_URL, "cluster/submit");
+  public static final String REMOVE_CLUSTERING = String.format("%s/%s", BASE_URL, "cluster/remove");
+
+  public static final String SUBMIT_CLEAN = String.format("%s/%s", BASE_URL, "clean/submit");
+  public static final String REMOVE_CLEAN = String.format("%s/%s", BASE_URL, "clean/remove");
+
+  public static final String DATABASE_NAME_PARAM = "db_name";
+  public static final String TABLE_NAME_PARAM = "table_name";
+  public static final String BASEPATH_PARAM = "basepath";
+  public static final String INSTANT_PARAM = "instant";
+  public static final String OWNER = "owner";
+  public static final String CLUSTER = "cluster";
+  public static final String QUEUE = "queue";
+  public static final String RESOURCE = "resource";
+  public static final String PARALLELISM = "parallelism";
+  public static final String EXTRA_PARAMS = "extra_params";
+  public static final String EXECUTION_ENGINE = "execution_engine";
+
+  private final HoodieTableManagerConfig config;
+  private final String host;
+  private final int port;
+  private final String basePath;
+  private final String dbName;
+  private final String tableName;
+
+  private static int failTimes;
+
+  private static final Logger LOG = LogManager.getLogger(HoodieTableManagerClient.class);

Review Comment:
   probably need to rebase this to log4j2 now



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/table/manager/HoodieTableManagerClient.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.table.manager;
+
+import org.apache.hudi.common.config.HoodieTableManagerConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.http.client.fluent.Request;
+import org.apache.http.client.fluent.Response;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class HoodieTableManagerClient {
+
+  private static final String BASE_URL = "/v1/hoodie/serivce";
+
+  public static final String REGISTER = String.format("%s/%s", BASE_URL, "register");
+
+  public static final String SUBMIT_COMPACTION = String.format("%s/%s", BASE_URL, "compact/submit");
+  public static final String REMOVE_COMPACTION = String.format("%s/%s", BASE_URL, "compact/remove");
+
+  public static final String SUBMIT_CLUSTERING = String.format("%s/%s", BASE_URL, "cluster/submit");
+  public static final String REMOVE_CLUSTERING = String.format("%s/%s", BASE_URL, "cluster/remove");
+
+  public static final String SUBMIT_CLEAN = String.format("%s/%s", BASE_URL, "clean/submit");
+  public static final String REMOVE_CLEAN = String.format("%s/%s", BASE_URL, "clean/remove");
+
+  public static final String DATABASE_NAME_PARAM = "db_name";
+  public static final String TABLE_NAME_PARAM = "table_name";
+  public static final String BASEPATH_PARAM = "basepath";
+  public static final String INSTANT_PARAM = "instant";
+  public static final String OWNER = "owner";
+  public static final String CLUSTER = "cluster";
+  public static final String QUEUE = "queue";
+  public static final String RESOURCE = "resource";
+  public static final String PARALLELISM = "parallelism";
+  public static final String EXTRA_PARAMS = "extra_params";
+  public static final String EXECUTION_ENGINE = "execution_engine";
+
+  private final HoodieTableManagerConfig config;
+  private final String host;
+  private final int port;
+  private final String basePath;
+  private final String dbName;
+  private final String tableName;
+
+  private static int failTimes;
+
+  private static final Logger LOG = LogManager.getLogger(HoodieTableManagerClient.class);
+
+  public HoodieTableManagerClient(HoodieTableMetaClient metaClient, HoodieTableManagerConfig config) {
+    this.basePath = metaClient.getBasePathV2().toString();
+    this.dbName = metaClient.getTableConfig().getDatabaseName();
+    this.tableName = metaClient.getTableConfig().getTableName();
+    this.host = config.getTableManagerHost();
+    this.port = config.getTableManagerPort();
+    this.config = config;
+  }
+
+  private String executeRequest(String requestPath, String instantRange) throws IOException {
+    URIBuilder builder =
+        new URIBuilder().setHost(host).setPort(port).setPath(requestPath).setScheme("http");
+
+    Map<String, String> queryParameters = getParamsWithAdditionalParams(
+        new String[] {DATABASE_NAME_PARAM, TABLE_NAME_PARAM, INSTANT_PARAM, OWNER, QUEUE, RESOURCE, PARALLELISM, EXTRA_PARAMS},
+        new String[] {dbName, tableName, instantRange, config.getDeployUsername(), config.getDeployQueue(),
+            config.getDeployResource(), String.valueOf(config.getDeployParallelism()), config.getDeployExtraParams()});
+    queryParameters.forEach(builder::addParameter);
+
+    String url = builder.toString();
+    LOG.info("Sending request to table management service : (" + url + ")");
+    Response response;
+    int timeout = this.config.getConnectionTimeout() * 1000; // msec
+    int requestRetryLimit = config.getConnectionRetryLimit();
+    int retry = 0;
+
+    while (retry < requestRetryLimit) {
+      try {
+        response = Request.Get(url).connectTimeout(timeout).socketTimeout(timeout).execute();
+        return response.returnContent().asString();
+      } catch (IOException e) {
+        retry++;
+        LOG.warn(String.format("Failed request to server %s, will retry for %d times", url, requestRetryLimit - retry), e);
+        if (requestRetryLimit == retry) {
+          throw e;
+        }
+      }
+
+      try {
+        TimeUnit.SECONDS.sleep(config.getConnectionRetryDelay());
+      } catch (InterruptedException e) {
+        // ignore
+      }
+    }
+
+    throw new IOException(String.format("Failed request to table management service %s after retry %d times", url, requestRetryLimit));
+  }
+
+  private Map<String, String> getParamsWithAdditionalParams(String[] paramNames, String[] paramVals) {
+    Map<String, String> paramsMap = new HashMap<>();
+    paramsMap.put(BASEPATH_PARAM, basePath);
+    ValidationUtils.checkArgument(paramNames.length == paramVals.length);
+    for (int i = 0; i < paramNames.length; i++) {
+      paramsMap.put(paramNames[i], paramVals[i]);
+    }
+    return paramsMap;
+  }
+
+  public void submitCompaction(List<String> instantsToSubmit) {
+    try {
+      executeRequest(SUBMIT_COMPACTION, StringUtils.join(instantsToSubmit.toArray(new String[0]), ","));
+    } catch (Exception e) {
+      LOG.error("Failed to schedule compaction to service", e);
+      failTimes++;
+      checkTolerableSchedule(e);
+      return;
+    }
+
+    failTimes = 0;
+  }
+
+  public void submitClean(List<String> instantsToSubmit) {
+    try {
+      executeRequest(SUBMIT_CLEAN, StringUtils.join(instantsToSubmit.toArray(new String[0]), ","));
+    } catch (Exception e) {
+      LOG.error("Failed to schedule clean to service", e);
+      failTimes++;
+      checkTolerableSchedule(e);
+      return;
+    }
+
+    failTimes = 0;
+  }
+
+  public void submitClustering(List<String> instantsToSubmit) {
+    try {
+      executeRequest(SUBMIT_CLUSTERING, StringUtils.join(instantsToSubmit.toArray(new String[0]), ","));
+    } catch (Exception e) {
+      LOG.error("Failed to schedule clustering to service", e);
+      failTimes++;
+      checkTolerableSchedule(e);
+      return;
+    }
+
+    failTimes = 0;
+  }
+
+  private void checkTolerableSchedule(Exception e) {
+    if (failTimes > config.getConnectionTolerableNum()) {

Review Comment:
   include the url that fails , so we have context? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org