You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by vi...@apache.org on 2020/07/23 11:07:16 UTC
[hudi] branch master updated: [HUDI-1037] Introduce a write
committed callback hook and given a default http callback implementation
(#1842)
This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new da10680 [HUDI-1037] Introduce a write committed callback hook and given a default http callback implementation (#1842)
da10680 is described below
commit da106803b6d0bdbb37f22b9bd73b9b3a77f77063
Author: Mathieu <wx...@126.com>
AuthorDate: Thu Jul 23 19:07:05 2020 +0800
[HUDI-1037] Introduce a write committed callback hook and given a default http callback implementation (#1842)
---
.../hudi/callback/HoodieWriteCommitCallback.java | 35 ++++++
.../http/HoodieWriteCommitHttpCallbackClient.java | 108 +++++++++++++++++++
.../common/HoodieWriteCommitCallbackMessage.java | 76 +++++++++++++
.../impl/HoodieWriteCommitHttpCallback.java | 58 ++++++++++
.../callback/util/HoodieCommitCallbackFactory.java | 47 ++++++++
.../hudi/client/AbstractHoodieWriteClient.java | 12 +++
.../config/HoodieWriteCommitCallbackConfig.java | 106 ++++++++++++++++++
.../org/apache/hudi/config/HoodieWriteConfig.java | 20 ++++
.../exception/HoodieCommitCallbackException.java | 35 ++++++
.../hudi/callback/http/TestCallbackHttpClient.java | 119 +++++++++++++++++++++
packaging/hudi-spark-bundle/pom.xml | 1 +
packaging/hudi-utilities-bundle/pom.xml | 1 +
12 files changed, 618 insertions(+)
diff --git a/hudi-client/src/main/java/org/apache/hudi/callback/HoodieWriteCommitCallback.java b/hudi-client/src/main/java/org/apache/hudi/callback/HoodieWriteCommitCallback.java
new file mode 100644
index 0000000..2f5a4ef
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/callback/HoodieWriteCommitCallback.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.callback;
+
+import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage;
+
+/**
+ * A callback interface help to call back when a write commit completes successfully.
+ */
+public interface HoodieWriteCommitCallback {
+
+ /**
+ * A callback method the user can implement to provide asynchronous handling of successful write.
+ * This method will be called when a write operation is committed successfully.
+ *
+ * @param callbackMessage Callback msg, which will be sent to external system.
+ */
+ void call(HoodieWriteCommitCallbackMessage callbackMessage);
+
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/callback/client/http/HoodieWriteCommitHttpCallbackClient.java b/hudi-client/src/main/java/org/apache/hudi/callback/client/http/HoodieWriteCommitHttpCallbackClient.java
new file mode 100644
index 0000000..6c41e2f
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/callback/client/http/HoodieWriteCommitHttpCallbackClient.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.callback.client.http;
+
+import org.apache.http.HttpHeaders;
+import org.apache.http.client.config.RequestConfig;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ContentType;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.hudi.config.HoodieWriteCommitCallbackConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Write commit callback http client.
+ */
+public class HoodieWriteCommitHttpCallbackClient implements Closeable {
+
+ private static final Logger LOG = LogManager.getLogger(HoodieWriteCommitHttpCallbackClient.class);
+
+ public static final String HEADER_KEY_API_KEY = "HUDI-CALLBACK-KEY";
+
+ private final String apiKey;
+ private final String url;
+ private final CloseableHttpClient client;
+ private Properties props;
+
+ public HoodieWriteCommitHttpCallbackClient(HoodieWriteConfig config) {
+ this.props = config.getProps();
+ this.apiKey = getApiKey();
+ this.url = getUrl();
+ this.client = getClient();
+ }
+
+ public HoodieWriteCommitHttpCallbackClient(String apiKey, String url, CloseableHttpClient client) {
+ this.apiKey = apiKey;
+ this.url = url;
+ this.client = client;
+ }
+
+ public void send(String callbackMsg) {
+ HttpPost request = new HttpPost(url);
+ request.setHeader(HEADER_KEY_API_KEY, apiKey);
+ request.setHeader(HttpHeaders.CONTENT_TYPE, ContentType.APPLICATION_JSON.toString());
+ request.setEntity(new StringEntity(callbackMsg, ContentType.APPLICATION_JSON));
+ try (CloseableHttpResponse response = client.execute(request)) {
+ int statusCode = response.getStatusLine().getStatusCode();
+ if (statusCode >= 300) {
+ LOG.warn(String.format("Failed to send callback message. Response was %s", response));
+ } else {
+ LOG.info(String.format("Sent Callback data %s to %s successfully !", callbackMsg, url));
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed to send callback.", e);
+ }
+ }
+
+ private String getApiKey() {
+ return props.getProperty(HoodieWriteCommitCallbackConfig.CALLBACK_HTTP_API_KEY);
+ }
+
+ private String getUrl() {
+ return props.getProperty(HoodieWriteCommitCallbackConfig.CALLBACK_HTTP_URL_PROP);
+ }
+
+ private CloseableHttpClient getClient() {
+ int timeoutSeconds = getHttpTimeoutSeconds() * 1000;
+ return HttpClientBuilder.create()
+ .setDefaultRequestConfig(RequestConfig.custom()
+ .setConnectTimeout(timeoutSeconds)
+ .setConnectionRequestTimeout(timeoutSeconds)
+ .setSocketTimeout(timeoutSeconds).build())
+ .build();
+ }
+
+ private Integer getHttpTimeoutSeconds() {
+ return Integer.parseInt(props.getProperty(HoodieWriteCommitCallbackConfig.CALLBACK_HTTP_TIMEOUT_SECONDS));
+ }
+
+ @Override
+ public void close() throws IOException {
+ client.close();
+ }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/callback/common/HoodieWriteCommitCallbackMessage.java b/hudi-client/src/main/java/org/apache/hudi/callback/common/HoodieWriteCommitCallbackMessage.java
new file mode 100644
index 0000000..0233fee
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/callback/common/HoodieWriteCommitCallbackMessage.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.callback.common;
+
+import java.io.Serializable;
+
+/**
+ * Base callback message, which contains commitTime and tableName only for now.
+ */
+public class HoodieWriteCommitCallbackMessage implements Serializable {
+
+ private static final long serialVersionUID = -3033643980627719561L;
+
+ /**
+ * CommitTime for one batch write, this is required.
+ */
+ private String commitTime;
+
+ /**
+ * Table name this batch commit to.
+ */
+ private String tableName;
+
+ /**
+ * BathPath the table located.
+ */
+ private String basePath;
+
+ public HoodieWriteCommitCallbackMessage() {
+ }
+
+ public HoodieWriteCommitCallbackMessage(String commitTime, String tableName, String basePath) {
+ this.commitTime = commitTime;
+ this.tableName = tableName;
+ this.basePath = basePath;
+ }
+
+ public String getCommitTime() {
+ return commitTime;
+ }
+
+ public void setCommitTime(String commitTime) {
+ this.commitTime = commitTime;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public void setTableName(String tableName) {
+ this.tableName = tableName;
+ }
+
+ public String getBasePath() {
+ return basePath;
+ }
+
+ public void setBasePath(String basePath) {
+ this.basePath = basePath;
+ }
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/callback/impl/HoodieWriteCommitHttpCallback.java b/hudi-client/src/main/java/org/apache/hudi/callback/impl/HoodieWriteCommitHttpCallback.java
new file mode 100644
index 0000000..910e626
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/callback/impl/HoodieWriteCommitHttpCallback.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.callback.impl;
+
+import org.apache.hudi.callback.HoodieWriteCommitCallback;
+import org.apache.hudi.callback.client.http.HoodieWriteCommitHttpCallbackClient;
+import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieCommitCallbackException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import java.io.IOException;
+
+/**
+ * A http implementation of {@link HoodieWriteCommitCallback}.
+ */
+public class HoodieWriteCommitHttpCallback implements HoodieWriteCommitCallback {
+
+ private static final Logger LOG = LogManager.getLogger(HoodieWriteCommitHttpCallback.class);
+
+ private final HoodieWriteCommitHttpCallbackClient client;
+
+ public HoodieWriteCommitHttpCallback(HoodieWriteConfig config) {
+ this.client = new HoodieWriteCommitHttpCallbackClient(config);
+ }
+
+ @Override
+ public void call(HoodieWriteCommitCallbackMessage callbackMessage) {
+ // convert to json
+ ObjectMapper mapper = new ObjectMapper();
+ String callbackMsg = null;
+ try {
+ callbackMsg = mapper.writeValueAsString(callbackMessage);
+ } catch (IOException e) {
+ throw new HoodieCommitCallbackException("Callback service convert message to json failed", e);
+ }
+ LOG.info("Try to send callbackMsg, msg = " + callbackMsg);
+ client.send(callbackMsg);
+ }
+
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/callback/util/HoodieCommitCallbackFactory.java b/hudi-client/src/main/java/org/apache/hudi/callback/util/HoodieCommitCallbackFactory.java
new file mode 100644
index 0000000..9d1e9c3
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/callback/util/HoodieCommitCallbackFactory.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.callback.util;
+
+import org.apache.hudi.callback.HoodieWriteCommitCallback;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieCommitCallbackException;
+
+import static org.apache.hudi.config.HoodieWriteCommitCallbackConfig.CALLBACK_CLASS_PROP;
+
+/**
+ * Factory help to create {@link HoodieWriteCommitCallback}.
+ */
+public class HoodieCommitCallbackFactory {
+ public static HoodieWriteCommitCallback create(HoodieWriteConfig config) {
+ String callbackClass = config.getCallbackClass();
+ if (!StringUtils.isNullOrEmpty(callbackClass)) {
+ Object instance = ReflectionUtils.loadClass(callbackClass, config);
+ if (!(instance instanceof HoodieWriteCommitCallback)) {
+ throw new HoodieCommitCallbackException(callbackClass + " is not a subclass of "
+ + HoodieWriteCommitCallback.class.getSimpleName());
+ }
+ return (HoodieWriteCommitCallback) instance;
+ } else {
+ throw new HoodieCommitCallbackException(String.format("The value of the config option %s can not be null or "
+ + "empty", CALLBACK_CLASS_PROP));
+ }
+ }
+
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
index b922caa..644aca9 100644
--- a/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
+++ b/hudi-client/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
@@ -19,6 +19,9 @@
package org.apache.hudi.client;
import com.codahale.metrics.Timer;
+import org.apache.hudi.callback.HoodieWriteCommitCallback;
+import org.apache.hudi.callback.common.HoodieWriteCommitCallbackMessage;
+import org.apache.hudi.callback.util.HoodieCommitCallbackFactory;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -60,6 +63,7 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
private transient Timer.Context writeContext = null;
private transient WriteOperationType operationType;
+ private transient HoodieWriteCommitCallback commitCallback;
public void setOperationType(WriteOperationType operationType) {
this.operationType = operationType;
@@ -124,6 +128,14 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload> e
throw new HoodieCommitException("Failed to complete commit " + config.getBasePath() + " at time " + instantTime,
e);
}
+
+ // callback if needed.
+ if (config.writeCommitCallbackOn()) {
+ if (null == commitCallback) {
+ commitCallback = HoodieCommitCallbackFactory.create(config);
+ }
+ commitCallback.call(new HoodieWriteCommitCallbackMessage(instantTime, config.getTableName(), config.getBasePath()));
+ }
return true;
}
diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteCommitCallbackConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteCommitCallbackConfig.java
new file mode 100644
index 0000000..47a01aa
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteCommitCallbackConfig.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.DefaultHoodieConfig;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Write callback related config.
+ */
+public class HoodieWriteCommitCallbackConfig extends DefaultHoodieConfig {
+
+ public static final String CALLBACK_ON = "hoodie.write.commit.callback.on";
+ public static final boolean DEFAULT_CALLBACK_ON = false;
+
+ public static final String CALLBACK_CLASS_PROP = "hoodie.write.commit.callback.class";
+ public static final String DEFAULT_CALLBACK_CLASS_PROP = "org.apache.hudi.callback.impl.HoodieWriteCommitHttpCallback";
+
+ // ***** REST callback configs *****
+ public static final String CALLBACK_HTTP_URL_PROP = "hoodie.write.commit.callback.http.url";
+ public static final String CALLBACK_HTTP_API_KEY = "hoodie.write.commit.callback.http.api.key";
+ public static final String DEFAULT_CALLBACK_HTTP_API_KEY = "hudi_write_commit_http_callback";
+ public static final String CALLBACK_HTTP_TIMEOUT_SECONDS = "hoodie.write.commit.callback.http.timeout.seconds";
+ public static final int DEFAULT_CALLBACK_HTTP_TIMEOUT_SECONDS = 3;
+
+ private HoodieWriteCommitCallbackConfig(Properties props) {
+ super(props);
+ }
+
+ public static HoodieWriteCommitCallbackConfig.Builder newBuilder() {
+ return new HoodieWriteCommitCallbackConfig.Builder();
+ }
+
+ public static class Builder {
+
+ private final Properties props = new Properties();
+
+ public HoodieWriteCommitCallbackConfig.Builder fromFile(File propertiesFile) throws IOException {
+ try (FileReader reader = new FileReader(propertiesFile)) {
+ this.props.load(reader);
+ return this;
+ }
+ }
+
+ public HoodieWriteCommitCallbackConfig.Builder fromProperties(Properties props) {
+ this.props.putAll(props);
+ return this;
+ }
+
+ public HoodieWriteCommitCallbackConfig.Builder writeCommitCallbackOn(String callbackOn) {
+ props.setProperty(CALLBACK_ON, callbackOn);
+ return this;
+ }
+
+ public HoodieWriteCommitCallbackConfig.Builder withCallbackClass(String callbackClass) {
+ props.setProperty(CALLBACK_CLASS_PROP, callbackClass);
+ return this;
+ }
+
+ public HoodieWriteCommitCallbackConfig.Builder withCallbackHttpUrl(String url) {
+ props.setProperty(CALLBACK_HTTP_URL_PROP, url);
+ return this;
+ }
+
+ public Builder withCallbackHttpTimeoutSeconds(String timeoutSeconds) {
+ props.setProperty(CALLBACK_HTTP_TIMEOUT_SECONDS, timeoutSeconds);
+ return this;
+ }
+
+ public Builder withCallbackHttpApiKey(String apiKey) {
+ props.setProperty(CALLBACK_HTTP_API_KEY, apiKey);
+ return this;
+ }
+
+ public HoodieWriteCommitCallbackConfig build() {
+ HoodieWriteCommitCallbackConfig config = new HoodieWriteCommitCallbackConfig(props);
+ setDefaultOnCondition(props, !props.containsKey(CALLBACK_ON), CALLBACK_ON, String.valueOf(DEFAULT_CALLBACK_ON));
+ setDefaultOnCondition(props, !props.containsKey(CALLBACK_CLASS_PROP), CALLBACK_CLASS_PROP, DEFAULT_CALLBACK_CLASS_PROP);
+ setDefaultOnCondition(props, !props.containsKey(CALLBACK_HTTP_API_KEY), CALLBACK_HTTP_API_KEY, DEFAULT_CALLBACK_HTTP_API_KEY);
+ setDefaultOnCondition(props, !props.containsKey(CALLBACK_HTTP_TIMEOUT_SECONDS), CALLBACK_HTTP_TIMEOUT_SECONDS,
+ String.valueOf(DEFAULT_CALLBACK_HTTP_TIMEOUT_SECONDS));
+
+ return config;
+ }
+ }
+
+}
diff --git a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index aefde2c..d51832d 100644
--- a/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ b/hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -642,6 +642,17 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return clientSpecifiedViewStorageConfig;
}
+ /**
+ * Commit call back configs.
+ */
+ public boolean writeCommitCallbackOn() {
+ return Boolean.parseBoolean(props.getProperty(HoodieWriteCommitCallbackConfig.CALLBACK_ON));
+ }
+
+ public String getCallbackClass() {
+ return props.getProperty(HoodieWriteCommitCallbackConfig.CALLBACK_CLASS_PROP);
+ }
+
public static class Builder {
private final Properties props = new Properties();
@@ -652,6 +663,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
private boolean isMemoryConfigSet = false;
private boolean isViewConfigSet = false;
private boolean isConsistencyGuardSet = false;
+ private boolean isCallbackConfigSet = false;
public Builder fromFile(File propertiesFile) throws IOException {
try (FileReader reader = new FileReader(propertiesFile)) {
@@ -798,6 +810,12 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
return this;
}
+ public Builder withCallbackConfig(HoodieWriteCommitCallbackConfig callbackConfig) {
+ props.putAll(callbackConfig.getProps());
+ isCallbackConfigSet = true;
+ return this;
+ }
+
public Builder withFinalizeWriteParallelism(int parallelism) {
props.setProperty(FINALIZE_WRITE_PARALLELISM, String.valueOf(parallelism));
return this;
@@ -865,6 +883,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
FileSystemViewStorageConfig.newBuilder().fromProperties(props).build());
setDefaultOnCondition(props, !isConsistencyGuardSet,
ConsistencyGuardConfig.newBuilder().fromProperties(props).build());
+ setDefaultOnCondition(props, !isCallbackConfigSet,
+ HoodieWriteCommitCallbackConfig.newBuilder().fromProperties(props).build());
setDefaultOnCondition(props, !props.containsKey(TIMELINE_LAYOUT_VERSION), TIMELINE_LAYOUT_VERSION,
String.valueOf(TimelineLayoutVersion.CURR_VERSION));
diff --git a/hudi-client/src/main/java/org/apache/hudi/exception/HoodieCommitCallbackException.java b/hudi-client/src/main/java/org/apache/hudi/exception/HoodieCommitCallbackException.java
new file mode 100644
index 0000000..57468cb
--- /dev/null
+++ b/hudi-client/src/main/java/org/apache/hudi/exception/HoodieCommitCallbackException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.exception;
+
+import org.apache.hudi.callback.HoodieWriteCommitCallback;
+
+/**
+ * Exception thrown for any higher level errors when {@link HoodieWriteCommitCallback} is executing a callback.
+ */
+public class HoodieCommitCallbackException extends HoodieException {
+
+ public HoodieCommitCallbackException(String msg, Throwable e) {
+ super(msg, e);
+ }
+
+ public HoodieCommitCallbackException(String msg) {
+ super(msg);
+ }
+}
\ No newline at end of file
diff --git a/hudi-client/src/test/java/org/apache/hudi/callback/http/TestCallbackHttpClient.java b/hudi-client/src/test/java/org/apache/hudi/callback/http/TestCallbackHttpClient.java
new file mode 100644
index 0000000..616dc31
--- /dev/null
+++ b/hudi-client/src/test/java/org/apache/hudi/callback/http/TestCallbackHttpClient.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.callback.http;
+
+import org.apache.http.StatusLine;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.hudi.callback.client.http.HoodieWriteCommitHttpCallbackClient;
+import org.apache.log4j.AppenderSkeleton;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.spi.LoggingEvent;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.junit.jupiter.MockitoExtension;
+
+import java.io.IOException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit test for {@link HoodieWriteCommitHttpCallbackClient}.
+ */
+@ExtendWith(MockitoExtension.class)
+public class TestCallbackHttpClient {
+
+ @Mock
+ AppenderSkeleton appender;
+
+ @Captor
+ ArgumentCaptor<LoggingEvent> logCaptor;
+
+ @Mock
+ CloseableHttpClient httpClient;
+
+ @Mock
+ CloseableHttpResponse httpResponse;
+
+ @Mock
+ StatusLine statusLine;
+
+ private void mockResponse(int statusCode) {
+ when(statusLine.getStatusCode()).thenReturn(statusCode);
+ when(httpResponse.getStatusLine()).thenReturn(statusLine);
+ try {
+ when(httpClient.execute(any())).thenReturn(httpResponse);
+ } catch (IOException e) {
+ fail(e.getMessage(), e);
+ }
+ }
+
+ @Test
+ public void sendPayloadShouldLogWhenRequestFailed() throws IOException {
+ Logger.getRootLogger().addAppender(appender);
+ when(httpClient.execute(any())).thenThrow(IOException.class);
+
+ HoodieWriteCommitHttpCallbackClient hoodieWriteCommitCallBackHttpClient =
+ new HoodieWriteCommitHttpCallbackClient("fake_api_key", "fake_url", httpClient);
+ hoodieWriteCommitCallBackHttpClient.send("{}");
+
+ verify(appender).doAppend(logCaptor.capture());
+ assertEquals("Failed to send callback.", logCaptor.getValue().getRenderedMessage());
+ assertEquals(Level.WARN, logCaptor.getValue().getLevel());
+ }
+
+ @Test
+ public void sendPayloadShouldLogUnsuccessfulSending() {
+ Logger.getRootLogger().addAppender(appender);
+ mockResponse(401);
+ when(httpResponse.toString()).thenReturn("unauthorized");
+
+ HoodieWriteCommitHttpCallbackClient hoodieWriteCommitCallBackHttpClient =
+ new HoodieWriteCommitHttpCallbackClient("fake_api_key", "fake_url", httpClient);
+ hoodieWriteCommitCallBackHttpClient.send("{}");
+
+ verify(appender).doAppend(logCaptor.capture());
+ assertEquals("Failed to send callback message. Response was unauthorized", logCaptor.getValue().getRenderedMessage());
+ assertEquals(Level.WARN, logCaptor.getValue().getLevel());
+ }
+
+ @Test
+ public void sendPayloadShouldLogSuccessfulSending() {
+ Logger.getRootLogger().addAppender(appender);
+ mockResponse(202);
+
+ HoodieWriteCommitHttpCallbackClient hoodieWriteCommitCallBackHttpClient =
+ new HoodieWriteCommitHttpCallbackClient("fake_api_key", "fake_url", httpClient);
+ hoodieWriteCommitCallBackHttpClient.send("{}");
+
+ verify(appender).doAppend(logCaptor.capture());
+ assertTrue(logCaptor.getValue().getRenderedMessage().startsWith("Sent Callback data"));
+ assertEquals(Level.INFO, logCaptor.getValue().getLevel());
+ }
+
+}
diff --git a/packaging/hudi-spark-bundle/pom.xml b/packaging/hudi-spark-bundle/pom.xml
index e0e60a9..f4affca 100644
--- a/packaging/hudi-spark-bundle/pom.xml
+++ b/packaging/hudi-spark-bundle/pom.xml
@@ -78,6 +78,7 @@
<include>org.jetbrains.kotlin:*</include>
<include>org.rocksdb:rocksdbjni</include>
<include>org.apache.httpcomponents:httpclient</include>
+ <include>org.apache.httpcomponents:httpcore</include>
<include>org.apache.httpcomponents:fluent-hc</include>
<include>org.antlr:stringtemplate</include>
<include>org.apache.parquet:parquet-avro</include>
diff --git a/packaging/hudi-utilities-bundle/pom.xml b/packaging/hudi-utilities-bundle/pom.xml
index 2da82f2..8cf3407 100644
--- a/packaging/hudi-utilities-bundle/pom.xml
+++ b/packaging/hudi-utilities-bundle/pom.xml
@@ -81,6 +81,7 @@
<include>org.jetbrains.kotlin:*</include>
<include>org.rocksdb:rocksdbjni</include>
<include>org.apache.httpcomponents:httpclient</include>
+ <include>org.apache.httpcomponents:httpcore</include>
<include>org.apache.httpcomponents:fluent-hc</include>
<include>org.antlr:stringtemplate</include>
<include>org.apache.parquet:parquet-avro</include>