You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by GitBox <gi...@apache.org> on 2020/07/21 00:56:52 UTC

[GitHub] [hudi] yanghua commented on a change in pull request #1842: [HUDI-1037]Introduce a write committed callback hook

yanghua commented on a change in pull request #1842:
URL: https://github.com/apache/hudi/pull/1842#discussion_r457769421



##########
File path: hudi-client/src/main/java/org/apache/hudi/callback/HoodieWriteCommitCallback.java
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.callback;
+
+/**
+ * A callback interface help to call back when a write commit completes successfully.
+ */
+public interface HoodieWriteCommitCallback {
+
+  /**
+   * A callback method the user can implement to provide asynchronous handling of successful write.
+   * This method will be called when a write operation is committed successfully.
+   *
+   * @param commitTime commitTime which is successfully committed
+   */
+  void call(String commitTime);

Review comment:
       Can we provide both commit instant and table name?

##########
File path: hudi-client/src/main/java/org/apache/hudi/callback/common/HoodieBaseCommitCallbackMessage.java
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.callback.common;
+
+import java.io.Serializable;
+
+/**
+ * Base callback message, which contains commitTime and tableName only for now.
+ */
+public class HoodieBaseCommitCallbackMessage implements Serializable {

Review comment:
       If we provide the  table name as a parameter in `HoodieWriteCommitCallback#call(...)`, then this class is not necessary, IMO.

##########
File path: hudi-client/src/main/java/org/apache/hudi/callback/client/http/HoodieWriteCallbackHttpClient.java
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.callback.client.http;
+
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.hudi.config.HoodieWriteCommitCallbackConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Write commit callback http client.
+ */
+public class HoodieWriteCallbackHttpClient implements Closeable {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieWriteCallbackHttpClient.class);
+
+  public static final String HEADER_KEY_API_KEY = "HUDI-CALLBACK-KEY";
+
+  private final String apiKey;
+  private final String url;
+  private final CloseableHttpClient client;
+  private Properties props;
+
+  public HoodieWriteCallbackHttpClient(HoodieWriteConfig config) {
+    this.props = config.getProps();
+    this.apiKey = getApiKey();
+    this.url = getUrl();
+    this.client = getClient();
+  }
+
+  public HoodieWriteCallbackHttpClient(String apiKey, String url, CloseableHttpClient client) {
+    this.apiKey = apiKey;
+    this.url = url;
+    this.client = client;
+  }
+
+  public void send(String callbackMsg) {
+    HttpPost request = new HttpPost(url);
+    request.setHeader(HEADER_KEY_API_KEY, apiKey);
+    request.setHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString());
+    request.setEntity(new StringEntity(callbackMsg, ContentType.APPLICATION_JSON));
+    try (CloseableHttpResponse response = client.execute(request)) {
+      int statusCode = response.getStatusLine().getStatusCode();
+      if (statusCode >= 300) {
+        LOG.warn(String.format("Failed to send callback message. Response was %s", response));
+      } else {
+        LOG.info(String.format("Sent Callback data %s to %s successfully !", callbackMsg, url));
+      }
+    } catch (IOException e) {
+      LOG.warn("Failed to send callback.", e);
+    }
+  }
+
+  private String getApiKey() {
+    return props.getProperty(HoodieWriteCommitCallbackConfig.CALLBACK_HTTP_API_KEY);
+  }
+
+  private String getUrl() {
+    return props.getProperty(HoodieWriteCommitCallbackConfig.CALLBACK_HTTP_URL_PROP);
+  }
+
+  private CloseableHttpClient getClient() {
+    Integer timeoutSeconds = getHttpTimeoutSeconds();
+    return HttpClientBuilder.create()
+        .setDefaultRequestConfig(RequestConfig.custom()
+            .setConnectTimeout(timeoutSeconds * 1000)

Review comment:
       We may need to consider making these hard codes configurable.

##########
File path: hudi-client/src/main/java/org/apache/hudi/callback/impl/HoodieHttpWriteCommitCallback.java
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.callback.impl;
+
+import org.apache.hudi.callback.HoodieWriteCommitCallback;
+import org.apache.hudi.callback.client.http.HoodieWriteCallbackHttpClient;
+import org.apache.hudi.callback.common.HoodieBaseCommitCallbackMessage;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieCommitCallbackException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.IOException;
+
+/**
+ * A http implementation of {@link HoodieWriteCommitCallback}.
+ */
+public class HoodieHttpWriteCommitCallback implements HoodieWriteCommitCallback {

Review comment:
       Is `HoodieWriteCommitHttpCallback` more reasonable?

##########
File path: hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -632,6 +632,21 @@ public FileSystemViewStorageConfig getClientSpecifiedViewStorageConfig() {
     return clientSpecifiedViewStorageConfig;
   }
 
+  /**
+   * Commit call back configs.
+   */
+  public boolean writeCommitCallbackOn() {
+    return Boolean.parseBoolean(props.getProperty(HoodieWriteCommitCallbackConfig.CALLBACK_ON));
+  }
+
+  public String getCallbackType() {

Review comment:
       IMO, this property is not necessary. We can only depend on `getCallbackClass ` to specify the detailed implementation. This is a simple **SPI** mode.  Otherwise, it causes two entry points in the factory you implemented. WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org