You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/07/23 11:07:16 UTC

[hudi] branch master updated: [HUDI-1037] Introduce a write committed callback hook and given a default http callback implementation (#1842)

This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new da10680  [HUDI-1037] Introduce a write committed callback hook and given a default http callback implementation (#1842)
da10680 is described below

commit da106803b6d0bdbb37f22b9bd73b9b3a77f77063
Author: Mathieu <wx...@126.com>
AuthorDate: Thu Jul 23 19:07:05 2020 +0800

    [HUDI-1037] Introduce a write committed callback hook and given a default http callback implementation (#1842)
---
 .../hudi/callback/HoodieWriteCommitCallback.java   |  35 ++++++
 .../http/HoodieWriteCommitHttpCallbackClient.java  | 108 +++++++++++++++++++
 .../common/HoodieWriteCommitCallbackMessage.java   |  76 +++++++++++++
 .../impl/HoodieWriteCommitHttpCallback.java        |  58 ++++++++++
 .../callback/util/HoodieCommitCallbackFactory.java |  47 ++++++++
 .../hudi/client/AbstractHoodieWriteClient.java     |  12 +++
 .../config/HoodieWriteCommitCallbackConfig.java    | 106 ++++++++++++++++++
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  20 ++++
 .../exception/HoodieCommitCallbackException.java   |  35 ++++++
 .../hudi/callback/http/TestCallbackHttpClient.java | 119 +++++++++++++++++++++
 packaging/hudi-spark-bundle/pom.xml                |   1 +
 packaging/hudi-utilities-bundle/pom.xml            |   1 +
 12 files changed, 618 insertions(+)

diff --git a/hudi-client/src/main/java/org/apache/hudi/callback/HoodieWriteCommitCallback.java b/hudi-client/src/main/java/org/apache/hudi/callback/HoodieWriteCommitCallback.java
new file mode 100644
index 0000000..2f5a4ef
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/callback/HoodieWriteCommitCallback.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.callback;
+
+import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage;
+
+/**
+ * A callback interface help to call back when a write commit completes successfully.
+ */
+public interface HoodieWriteCommitCallback {
+
+  /**
+   * A callback method the user can implement to provide asynchronous handling of successful write.
+   * This method will be called when a write operation is committed successfully.
+   *
+   * @param callbackMessage Callback msg, which will be sent to external system.
+   */
+  void call(HoodieWriteCommitCallbackMessage callbackMessage);
+
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/callback/client/http/HoodieWriteCommitHttpCallbackClient.java b/hudi-client/src/main/java/org/apache/hudi/callback/client/http/HoodieWriteCommitHttpCallbackClient.java
new file mode 100644
index 0000000..6c41e2f
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/callback/client/http/HoodieWriteCommitHttpCallbackClient.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.callback.client.http;
+
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.hudi.config.HoodieWriteCommitCallbackConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Write commit callback http client.
+ */
+public class HoodieWriteCommitHttpCallbackClient implements Closeable {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieWriteCommitHttpCallbackClient.class);
+
+  public static final String HEADER_KEY_API_KEY = "HUDI-CALLBACK-KEY";
+
+  private final String apiKey;
+  private final String url;
+  private final CloseableHttpClient client;
+  private Properties props;
+
+  public HoodieWriteCommitHttpCallbackClient(HoodieWriteConfig config) {
+    this.props = config.getProps();
+    this.apiKey = getApiKey();
+    this.url = getUrl();
+    this.client = getClient();
+  }
+
+  public HoodieWriteCommitHttpCallbackClient(String apiKey, String url, CloseableHttpClient client) {
+    this.apiKey = apiKey;
+    this.url = url;
+    this.client = client;
+  }
+
+  public void send(String callbackMsg) {
+    HttpPost request = new HttpPost(url);
+    request.setHeader(HEADER_KEY_API_KEY, apiKey);
+    request.setHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString());
+    request.setEntity(new StringEntity(callbackMsg, ContentType.APPLICATION_JSON));
+    try (CloseableHttpResponse response = client.execute(request)) {
+      int statusCode = response.getStatusLine().getStatusCode();
+      if (statusCode >= 300) {
+        LOG.warn(String.format("Failed to send callback message. Response was %s", response));
+      } else {
+        LOG.info(String.format("Sent Callback data %s to %s successfully !", callbackMsg, url));
+      }
+    } catch (IOException e) {
+      LOG.warn("Failed to send callback.", e);
+    }
+  }
+
+  private String getApiKey() {
+    return props.getProperty(HoodieWriteCommitCallbackConfig.CALLBACK_HTTP_API_KEY);
+  }
+
+  private String getUrl() {
+    return props.getProperty(HoodieWriteCommitCallbackConfig.CALLBACK_HTTP_URL_PROP);
+  }
+
+  private CloseableHttpClient getClient() {
+    int timeoutSeconds = getHttpTimeoutSeconds() * 1000;
+    return HttpClientBuilder.create()
+        .setDefaultRequestConfig(RequestConfig.custom()
+            .setConnectTimeout(timeoutSeconds)
+            .setConnectionRequestTimeout(timeoutSeconds)
+            .setSocketTimeout(timeoutSeconds).build())
+        .build();
+  }
+
+  private Integer getHttpTimeoutSeconds() {
+    return Integer.parseInt(props.getProperty(HoodieWriteCommitCallbackConfig.CALLBACK_HTTP_TIMEOUT_SECONDS));
+  }
+
+  @Override
+  public void close() throws IOException {
+    client.close();
+  }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/callback/common/HoodieWriteCommitCallbackMessage.java b/hudi-client/src/main/java/org/apache/hudi/callback/common/HoodieWriteCommitCallbackMessage.java
new file mode 100644
index 0000000..0233fee
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/callback/common/HoodieWriteCommitCallbackMessage.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.callback.common;
+
+import java.io.Serializable;
+
+/**
+ * Base callback message, which contains commitTime and tableName only for now.
+ */
+public class HoodieWriteCommitCallbackMessage implements Serializable {
+
+  private static final long serialVersionUID = -3033643980627719561L;
+
+  /**
+   * CommitTime for one batch write, this is required.
+   */
+  private String commitTime;
+
+  /**
+   * Table name this batch commit to.
+   */
+  private String tableName;
+
+  /**
+   * BathPath the table located.
+   */
+  private String basePath;
+
+  public HoodieWriteCommitCallbackMessage() {
+  }
+
+  public HoodieWriteCommitCallbackMessage(String commitTime, String tableName, String basePath) {
+    this.commitTime = commitTime;
+    this.tableName = tableName;
+    this.basePath = basePath;
+  }
+
+  public String getCommitTime() {
+    return commitTime;
+  }
+
+  public void setCommitTime(String commitTime) {
+    this.commitTime = commitTime;
+  }
+
+  public String getTableName() {
+    return tableName;
+  }
+
+  public void setTableName(String tableName) {
+    this.tableName = tableName;
+  }
+
+  public String getBasePath() {
+    return basePath;
+  }
+
+  public void setBasePath(String basePath) {
+    this.basePath = basePath;
+  }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/callback/impl/HoodieWriteCommitHttpCallback.java b/hudi-client/src/main/java/org/apache/hudi/callback/impl/HoodieWriteCommitHttpCallback.java
new file mode 100644
index 0000000..910e626
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/callback/impl/HoodieWriteCommitHttpCallback.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.callback.impl;
+
+import org.apache.hudi.callback.HoodieWriteCommitCallback;
+import org.apache.hudi.callback.client.http.HoodieWriteCommitHttpCallbackClient;
+import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieCommitCallbackException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.IOException;
+
+/**
+ * A http implementation of {@link HoodieWriteCommitCallback}.
+ */
+public class HoodieWriteCommitHttpCallback implements HoodieWriteCommitCallback {
+
+  private static final Logger LOG = LogManager.getLogger(HoodieWriteCommitHttpCallback.class);
+
+  private final HoodieWriteCommitHttpCallbackClient client;
+
+  public HoodieWriteCommitHttpCallback(HoodieWriteConfig config) {
+    this.client = new HoodieWriteCommitHttpCallbackClient(config);
+  }
+
+  @Override
+  public void call(HoodieWriteCommitCallbackMessage callbackMessage) {
+    // convert to json
+    ObjectMapper mapper = new ObjectMapper();
+    String callbackMsg = null;
+    try {
+      callbackMsg = mapper.writeValueAsString(callbackMessage);
+    } catch (IOException e) {
+      throw new HoodieCommitCallbackException("Callback service convert message to json failed", e);
+    }
+    LOG.info("Try to send callbackMsg, msg = " + callbackMsg);
+    client.send(callbackMsg);
+  }
+
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/callback/util/HoodieCommitCallbackFactory.java b/hudi-client/src/main/java/org/apache/hudi/callback/util/HoodieCommitCallbackFactory.java
new file mode 100644
index 0000000..9d1e9c3
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/callback/util/HoodieCommitCallbackFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.callback.util;
+
+import org.apache.hudi.callback.HoodieWriteCommitCallback;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieCommitCallbackException;
+
+import static org.apache.hudi.config.HoodieWriteCommitCallbackConfig.CALLBACK_CLASS_PROP;
+
+/**
+ * Factory help to create {@link HoodieWriteCommitCallback}.
+ */
+public class HoodieCommitCallbackFactory {
+  public static HoodieWriteCommitCallback create(HoodieWriteConfig config) {
+    String callbackClass = config.getCallbackClass();
+    if (!StringUtils.isNullOrEmpty(callbackClass)) {
+      Object instance = ReflectionUtils.loadClass(callbackClass, config);
+      if (!(instance instanceof HoodieWriteCommitCallback)) {
+        throw new HoodieCommitCallbackException(callbackClass + " is not a subclass of "
+            + HoodieWriteCommitCallback.class.getSimpleName());
+      }
+      return (HoodieWriteCommitCallback) instance;
+    } else {
+      throw new HoodieCommitCallbackException(String.format("The value of the config option %s can not be null or "
+          + "empty", CALLBACK_CLASS_PROP));
+    }
+  }
+
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
index b922caa..644aca9 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
@@ -19,6 +19,9 @@
 package org.apache.hudi.client;
 
 import com.codahale.metrics.Timer;
+import org.apache.hudi.callback.HoodieWriteCommitCallback;
+import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage;
+import org.apache.hudi.callback.util.HoodieCommitCallbackFactory;
 import org.apache.hudi.client.embedded.EmbeddedTimelineService;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -60,6 +63,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
 
   private transient Timer.Context writeContext = null;
   private transient WriteOperationType operationType;
+  private transient HoodieWriteCommitCallback commitCallback;
 
   public void setOperationType(WriteOperationType operationType) {
     this.operationType = operationType;
@@ -124,6 +128,14 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
       throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime,
           e);
     }
+
+    // callback if needed.
+    if (config.writeCommitCallbackOn()) {
+      if (null == commitCallback) {
+        commitCallback = HoodieCommitCallbackFactory.create(config);
+      }
+      commitCallback.call(new HoodieWriteCommitCallbackMessage(instantTime, config.getTableName(), config.getBasePath()));
+    }
     return true;
   }
 
diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteCommitCallbackConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteCommitCallbackConfig.java
new file mode 100644
index 0000000..47a01aa
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteCommitCallbackConfig.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.DefaultHoodieConfig;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Write callback related config.
+ */
+public class HoodieWriteCommitCallbackConfig extends DefaultHoodieConfig {
+
+  public static final String CALLBACK_ON = "hoodie.write.commit.callback.on";
+  public static final boolean DEFAULT_CALLBACK_ON = false;
+
+  public static final String CALLBACK_CLASS_PROP = "hoodie.write.commit.callback.class";
+  public static final String DEFAULT_CALLBACK_CLASS_PROP = "org.apache.hudi.callback.impl.HoodieWriteCommitHttpCallback";
+
+  // ***** REST callback configs *****
+  public static final String CALLBACK_HTTP_URL_PROP = "hoodie.write.commit.callback.http.url";
+  public static final String CALLBACK_HTTP_API_KEY = "hoodie.write.commit.callback.http.api.key";
+  public static final String DEFAULT_CALLBACK_HTTP_API_KEY = "hudi_write_commit_http_callback";
+  public static final String CALLBACK_HTTP_TIMEOUT_SECONDS = "hoodie.write.commit.callback.http.timeout.seconds";
+  public static final int DEFAULT_CALLBACK_HTTP_TIMEOUT_SECONDS = 3;
+
+  private HoodieWriteCommitCallbackConfig(Properties props) {
+    super(props);
+  }
+
+  public static HoodieWriteCommitCallbackConfig.Builder newBuilder() {
+    return new HoodieWriteCommitCallbackConfig.Builder();
+  }
+
+  public static class Builder {
+
+    private final Properties props = new Properties();
+
+    public HoodieWriteCommitCallbackConfig.Builder fromFile(File propertiesFile) throws IOException {
+      try (FileReader reader = new FileReader(propertiesFile)) {
+        this.props.load(reader);
+        return this;
+      }
+    }
+
+    public HoodieWriteCommitCallbackConfig.Builder fromProperties(Properties props) {
+      this.props.putAll(props);
+      return this;
+    }
+
+    public HoodieWriteCommitCallbackConfig.Builder writeCommitCallbackOn(String callbackOn) {
+      props.setProperty(CALLBACK_ON, callbackOn);
+      return this;
+    }
+
+    public HoodieWriteCommitCallbackConfig.Builder withCallbackClass(String callbackClass) {
+      props.setProperty(CALLBACK_CLASS_PROP, callbackClass);
+      return this;
+    }
+
+    public HoodieWriteCommitCallbackConfig.Builder withCallbackHttpUrl(String url) {
+      props.setProperty(CALLBACK_HTTP_URL_PROP, url);
+      return this;
+    }
+
+    public Builder withCallbackHttpTimeoutSeconds(String timeoutSeconds) {
+      props.setProperty(CALLBACK_HTTP_TIMEOUT_SECONDS, timeoutSeconds);
+      return this;
+    }
+
+    public Builder withCallbackHttpApiKey(String apiKey) {
+      props.setProperty(CALLBACK_HTTP_API_KEY, apiKey);
+      return this;
+    }
+
+    public HoodieWriteCommitCallbackConfig build() {
+      HoodieWriteCommitCallbackConfig config = new HoodieWriteCommitCallbackConfig(props);
+      setDefaultOnCondition(props, !props.containsKey(CALLBACK_ON), CALLBACK_ON, String.valueOf(DEFAULT_CALLBACK_ON));
+      setDefaultOnCondition(props, !props.containsKey(CALLBACK_CLASS_PROP), CALLBACK_CLASS_PROP, DEFAULT_CALLBACK_CLASS_PROP);
+      setDefaultOnCondition(props, !props.containsKey(CALLBACK_HTTP_API_KEY), CALLBACK_HTTP_API_KEY, DEFAULT_CALLBACK_HTTP_API_KEY);
+      setDefaultOnCondition(props, !props.containsKey(CALLBACK_HTTP_TIMEOUT_SECONDS), CALLBACK_HTTP_TIMEOUT_SECONDS,
+          String.valueOf(DEFAULT_CALLBACK_HTTP_TIMEOUT_SECONDS));
+
+      return config;
+    }
+  }
+
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index aefde2c..d51832d 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -642,6 +642,17 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
     return clientSpecifiedViewStorageConfig;
   }
 
+  /**
+   * Commit call back configs.
+   */
+  public boolean writeCommitCallbackOn() {
+    return Boolean.parseBoolean(props.getProperty(HoodieWriteCommitCallbackConfig.CALLBACK_ON));
+  }
+
+  public String getCallbackClass() {
+    return props.getProperty(HoodieWriteCommitCallbackConfig.CALLBACK_CLASS_PROP);
+  }
+
   public static class Builder {
 
     private final Properties props = new Properties();
@@ -652,6 +663,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
     private boolean isMemoryConfigSet = false;
     private boolean isViewConfigSet = false;
     private boolean isConsistencyGuardSet = false;
+    private boolean isCallbackConfigSet = false;
 
     public Builder fromFile(File propertiesFile) throws IOException {
       try (FileReader reader = new FileReader(propertiesFile)) {
@@ -798,6 +810,12 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
       return this;
     }
 
+    public Builder withCallbackConfig(HoodieWriteCommitCallbackConfig callbackConfig) {
+      props.putAll(callbackConfig.getProps());
+      isCallbackConfigSet = true;
+      return this;
+    }
+
     public Builder withFinalizeWriteParallelism(int parallelism) {
       props.setProperty(FINALIZE_WRITE_PARALLELISM, String.valueOf(parallelism));
       return this;
@@ -865,6 +883,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
           FileSystemViewStorageConfig.newBuilder().fromProperties(props).build());
       setDefaultOnCondition(props, !isConsistencyGuardSet,
           ConsistencyGuardConfig.newBuilder().fromProperties(props).build());
+      setDefaultOnCondition(props, !isCallbackConfigSet,
+          HoodieWriteCommitCallbackConfig.newBuilder().fromProperties(props).build());
 
       setDefaultOnCondition(props, !props.containsKey(TIMELINE_LAYOUT_VERSION), TIMELINE_LAYOUT_VERSION,
           String.valueOf(TimelineLayoutVersion.CURR_VERSION));
diff --git a/hudi-client/src/main/java/org/apache/hudi/exception/HoodieCommitCallbackException.java b/hudi-client/src/main/java/org/apache/hudi/exception/HoodieCommitCallbackException.java
new file mode 100644
index 0000000..57468cb
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/exception/HoodieCommitCallbackException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.exception;
+
+import org.apache.hudi.callback.HoodieWriteCommitCallback;
+
+/**
+ * Exception thrown for any higher level errors when {@link HoodieWriteCommitCallback} is executing a callback.
+ */
+public class HoodieCommitCallbackException extends HoodieException {
+
+  public HoodieCommitCallbackException(String msg, Throwable e) {
+    super(msg, e);
+  }
+
+  public HoodieCommitCallbackException(String msg) {
+    super(msg);
+  }
+}
\ No newline at end of file
diff --git a/hudi-client/src/test/java/org/apache/hudi/callback/http/TestCallbackHttpClient.java b/hudi-client/src/test/java/org/apache/hudi/callback/http/TestCallbackHttpClient.java
new file mode 100644
index 0000000..616dc31
--- /dev/null
+++ b/hudi-client/src/test/java/org/apache/hudi/callback/http/TestCallbackHttpClient.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.callback.http;
+
+import org.apache.http.StatusLine;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.hudi.callback.client.http.HoodieWriteCommitHttpCallbackClient;
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.io.IOException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit test for {@link HoodieWriteCommitHttpCallbackClient}.
+ */
+@ExtendWith(MockitoExtension.class)
+public class TestCallbackHttpClient {
+
+  @Mock
+  AppenderSkeleton appender;
+
+  @Captor
+  ArgumentCaptor<LoggingEvent> logCaptor;
+
+  @Mock
+  CloseableHttpClient httpClient;
+
+  @Mock
+  CloseableHttpResponse httpResponse;
+
+  @Mock
+  StatusLine statusLine;
+
+  private void mockResponse(int statusCode) {
+    when(statusLine.getStatusCode()).thenReturn(statusCode);
+    when(httpResponse.getStatusLine()).thenReturn(statusLine);
+    try {
+      when(httpClient.execute(any())).thenReturn(httpResponse);
+    } catch (IOException e) {
+      fail(e.getMessage(), e);
+    }
+  }
+
+  @Test
+  public void sendPayloadShouldLogWhenRequestFailed() throws IOException {
+    Logger.getRootLogger().addAppender(appender);
+    when(httpClient.execute(any())).thenThrow(IOException.class);
+
+    HoodieWriteCommitHttpCallbackClient hoodieWriteCommitCallBackHttpClient =
+        new HoodieWriteCommitHttpCallbackClient("fake_api_key", "fake_url", httpClient);
+    hoodieWriteCommitCallBackHttpClient.send("{}");
+
+    verify(appender).doAppend(logCaptor.capture());
+    assertEquals("Failed to send callback.", logCaptor.getValue().getRenderedMessage());
+    assertEquals(Level.WARN, logCaptor.getValue().getLevel());
+  }
+
+  @Test
+  public void sendPayloadShouldLogUnsuccessfulSending() {
+    Logger.getRootLogger().addAppender(appender);
+    mockResponse(401);
+    when(httpResponse.toString()).thenReturn("unauthorized");
+
+    HoodieWriteCommitHttpCallbackClient hoodieWriteCommitCallBackHttpClient =
+        new HoodieWriteCommitHttpCallbackClient("fake_api_key", "fake_url", httpClient);
+    hoodieWriteCommitCallBackHttpClient.send("{}");
+
+    verify(appender).doAppend(logCaptor.capture());
+    assertEquals("Failed to send callback message. Response was unauthorized", logCaptor.getValue().getRenderedMessage());
+    assertEquals(Level.WARN, logCaptor.getValue().getLevel());
+  }
+
+  @Test
+  public void sendPayloadShouldLogSuccessfulSending() {
+    Logger.getRootLogger().addAppender(appender);
+    mockResponse(202);
+
+    HoodieWriteCommitHttpCallbackClient hoodieWriteCommitCallBackHttpClient =
+        new HoodieWriteCommitHttpCallbackClient("fake_api_key", "fake_url", httpClient);
+    hoodieWriteCommitCallBackHttpClient.send("{}");
+
+    verify(appender).doAppend(logCaptor.capture());
+    assertTrue(logCaptor.getValue().getRenderedMessage().startsWith("Sent Callback data"));
+    assertEquals(Level.INFO, logCaptor.getValue().getLevel());
+  }
+
+}
diff --git a/packaging/hudi-spark-bundle/pom.xml b/packaging/hudi-spark-bundle/pom.xml
index e0e60a9..f4affca 100644
--- a/packaging/hudi-spark-bundle/pom.xml
+++ b/packaging/hudi-spark-bundle/pom.xml
@@ -78,6 +78,7 @@
                   <include>org.jetbrains.kotlin:*</include>
                   <include>org.rocksdb:rocksdbjni</include>
                   <include>org.apache.httpcomponents:httpclient</include>
+                  <include>org.apache.httpcomponents:httpcore</include>
                   <include>org.apache.httpcomponents:fluent-hc</include>
                   <include>org.antlr:stringtemplate</include>
                   <include>org.apache.parquet:parquet-avro</include>
diff --git a/packaging/hudi-utilities-bundle/pom.xml b/packaging/hudi-utilities-bundle/pom.xml
index 2da82f2..8cf3407 100644
--- a/packaging/hudi-utilities-bundle/pom.xml
+++ b/packaging/hudi-utilities-bundle/pom.xml
@@ -81,6 +81,7 @@
                   <include>org.jetbrains.kotlin:*</include>
                   <include>org.rocksdb:rocksdbjni</include>
                   <include>org.apache.httpcomponents:httpclient</include>
+                  <include>org.apache.httpcomponents:httpcore</include>
                   <include>org.apache.httpcomponents:fluent-hc</include>
                   <include>org.antlr:stringtemplate</include>
                   <include>org.apache.parquet:parquet-avro</include>