You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2022/07/13 06:34:14 UTC

[GitHub] [hudi] suryaprasanna commented on a diff in pull request #5681: [HUDI-4148] Preparations and client for hudi table manager service

suryaprasanna commented on code in PR #5681:
URL: https://github.com/apache/hudi/pull/5681#discussion_r919667071


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/table/manager/HoodieTableManagerClient.java:
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.table.manager;
+
+import org.apache.hudi.common.config.HoodieTableManagerConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+
+import org.apache.http.client.fluent.Request;
+import org.apache.http.client.fluent.Response;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class HoodieTableManagerClient {
+
+  private static final String BASE_URL = "/v1/hoodie/serivce";

Review Comment:
   Please fix the Typo on word service.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/table/manager/HoodieTableManagerClient.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.table.manager;
+
+import org.apache.hudi.common.config.HoodieTableManagerConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.ClusteringUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieRemoteException;
+
+import org.apache.http.client.fluent.Request;
+import org.apache.http.client.fluent.Response;
+import org.apache.http.client.utils.URIBuilder;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Client which send the table service instants to the table management service.
+ */
+public class HoodieTableManagerClient {
+
+  private static final String BASE_URL = "/v1/hoodie/serivce";
+
+  public static final String REGISTER_ENDPOINT = String.format("%s/%s", BASE_URL, "register");
+
+  public static final String SUBMIT_COMPACTION = String.format("%s/%s", BASE_URL, "compact/submit");
+  public static final String REMOVE_COMPACTION = String.format("%s/%s", BASE_URL, "compact/remove");
+
+  public static final String SUBMIT_CLUSTERING = String.format("%s/%s", BASE_URL, "cluster/submit");
+  public static final String REMOVE_CLUSTERING = String.format("%s/%s", BASE_URL, "cluster/remove");
+
+  public static final String SUBMIT_CLEAN = String.format("%s/%s", BASE_URL, "clean/submit");
+  public static final String REMOVE_CLEAN = String.format("%s/%s", BASE_URL, "clean/remove");
+
+  public static final String DATABASE_NAME_PARAM = "db_name";
+  public static final String TABLE_NAME_PARAM = "table_name";
+  public static final String BASEPATH_PARAM = "basepath";
+  public static final String INSTANT_PARAM = "instant";
+  public static final String USERNAME = "username";
+  public static final String CLUSTER = "cluster";
+  public static final String QUEUE = "queue";
+  public static final String RESOURCE = "resource";
+  public static final String PARALLELISM = "parallelism";
+  public static final String EXTRA_PARAMS = "extra_params";
+  public static final String EXECUTION_ENGINE = "execution_engine";
+
+  private final HoodieTableManagerConfig config;
+  private final HoodieTableMetaClient metaClient;
+  private final String host;
+  private final int port;
+  private final String basePath;
+  private final String dbName;
+  private final String tableName;
+
+  private static final Logger LOG = LogManager.getLogger(HoodieTableManagerClient.class);
+
+  public HoodieTableManagerClient(HoodieTableMetaClient metaClient, HoodieTableManagerConfig config) {
+    this.basePath = metaClient.getBasePathV2().toString();
+    this.dbName = metaClient.getTableConfig().getDatabaseName();
+    this.tableName = metaClient.getTableConfig().getTableName();
+    this.host = config.getTableManagerHost();
+    this.port = config.getTableManagerPort();
+    this.config = config;
+    this.metaClient = metaClient;
+  }
+
+  private String executeRequest(String requestPath, Map<String, String> queryParameters) throws IOException {
+    URIBuilder builder =
+        new URIBuilder().setHost(host).setPort(port).setPath(requestPath).setScheme("http");
+    queryParameters.forEach(builder::addParameter);
+
+    String url = builder.toString();
+    LOG.info("Sending request to table management service : (" + url + ")");
+    Response response;
+    int timeout = this.config.getConnectionTimeout() * 1000; // msec
+    int requestRetryLimit = config.getConnectionRetryLimit();
+    int retry = 0;
+
+    while (retry < requestRetryLimit) {
+      try {
+        response = Request.Get(url).connectTimeout(timeout).socketTimeout(timeout).execute();
+        return response.returnContent().asString();
+      } catch (IOException e) {
+        retry++;
+        LOG.warn(String.format("Failed request to server %s, will retry for %d times", url, requestRetryLimit - retry), e);
+        if (requestRetryLimit == retry) {
+          throw e;
+        }
+      }
+
+      try {
+        TimeUnit.SECONDS.sleep(config.getConnectionRetryDelay());
+      } catch (InterruptedException e) {
+        // ignore
+      }
+    }
+
+    throw new IOException(String.format("Failed request to table management service %s after retry %d times", url, requestRetryLimit));
+  }
+
+  private Map<String, String> getParamsWithAdditionalParams(String[] paramNames, String[] paramVals) {
+    Map<String, String> paramsMap = new HashMap<>();
+    paramsMap.put(BASEPATH_PARAM, basePath);
+    ValidationUtils.checkArgument(paramNames.length == paramVals.length);
+    for (int i = 0; i < paramNames.length; i++) {
+      paramsMap.put(paramNames[i], paramVals[i]);
+    }
+    return paramsMap;
+  }
+
+  public void register() {
+    try {
+      executeRequest(REGISTER_ENDPOINT, getDefaultParams(null));
+    } catch (IOException e) {
+      throw new HoodieRemoteException(e);
+    }
+  }
+
+  public void submitCompaction() {
+    try {
+      String instantRange = StringUtils.join(metaClient.reloadActiveTimeline()
+          .filterPendingCompactionTimeline()
+          .getInstants()
+          .map(HoodieInstant::getTimestamp)
+          .toArray(String[]::new), ",");
+
+      executeRequest(SUBMIT_COMPACTION, getDefaultParams(instantRange));
+    } catch (IOException e) {
+      throw new HoodieRemoteException(e);
+    }
+  }
+
+  public void submitClean() {
+    try {
+      String instantRange = StringUtils.join(metaClient.reloadActiveTimeline()
+          .getCleanerTimeline()
+          .filterInflightsAndRequested()
+          .getInstants()
+          .map(HoodieInstant::getTimestamp)
+          .toArray(String[]::new), ",");
+
+      executeRequest(SUBMIT_CLEAN, getDefaultParams(instantRange));
+    } catch (IOException e) {
+      throw new HoodieRemoteException(e);
+    }
+  }
+
+  public void submitClustering() {
+    try {
+      metaClient.reloadActiveTimeline();
+      String instantRange = StringUtils.join(ClusteringUtils.getPendingClusteringInstantTimes(metaClient)

Review Comment:
   I noticed that whenever an action is scheduled, instead of sending that particular instant, we are sending all the pending instants for that action as part of the request. This would mean the remote client has to iterate over the instants and update the contents to the mysql/hudi db. 
   
   I am guessing this was done so that in case an action is scheduled but failed to update the table service, then the .requested instant will be left on the timeline.
   
   One way we are trying to workaround this is by sending the request before creating .requested files. 
   
   Timeline of actions will look like this,
   
   1. Generate clustering/compaction plan for an instant `x`
   2. After plan is generated, send request to table service for the instant `x`
   3. Create x.clustering.requested or x.compaction.requested file on the timeline
   
   In case the request is successfully send to table service and the .requested file creation failed then it will be treated as a no-op by table service when it schedules the job. There is a possibility that table service can execute right after point 2, but it can be handled by introducing some delay.
   
   This above approach is useful, if we want to send any spark job specific parameters to the table service for executing that instants, example no. of executors. Since, no.of executors can vary depending on the plan.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@hudi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org