You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/07/30 08:14:39 UTC

[rocketmq-schema-registry] 21/27: add http client for rocketmq schema registry

This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-schema-registry.git

commit 3c51ffff7e93fa3ef76d1dd511e9b8f5d7e2a6ba
Author: hankunming <ha...@xiaomi.com>
AuthorDate: Wed Jul 20 21:03:19 2022 +0800

    add http client for rocketmq schema registry
---
 client/pom.xml                                     |  27 ++++
 .../client/NormalSchemaRegistryClient.java         |  65 ++++++++
 .../registry/client/SchemaRegistryClient.java      |  59 +++++++
 .../client/exceptions/RestClientException.java     |  38 +++++
 .../schema/registry/client/rest/JacksonMapper.java |  27 ++++
 .../schema/registry/client/rest/RestService.java   |  96 ++++++++++++
 .../schema/registry/client/rest/UrlBuilder.java    | 170 +++++++++++++++++++++
 .../schema/registry/client/util/HttpUtil.java      |  97 ++++++++++++
 pom.xml                                            |   1 +
 9 files changed, 580 insertions(+)

diff --git a/client/pom.xml b/client/pom.xml
new file mode 100644
index 0000000..2ee7881
--- /dev/null
+++ b/client/pom.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>rocketmq-schema-registry-all</artifactId>
+        <groupId>org.apache.rocketmq</groupId>
+        <version>0.0.2-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>client</artifactId>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>common</artifactId>
+            <version>0.0.2-SNAPSHOT</version>
+            <scope>compile</scope>
+        </dependency>
+    </dependencies>
+
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+    </properties>
+
+</project>
\ No newline at end of file
diff --git a/client/src/main/java/org/apache/rocketmq/schema/registry/client/NormalSchemaRegistryClient.java b/client/src/main/java/org/apache/rocketmq/schema/registry/client/NormalSchemaRegistryClient.java
new file mode 100644
index 0000000..522b6a2
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/schema/registry/client/NormalSchemaRegistryClient.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.client;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
+import org.apache.rocketmq.schema.registry.client.rest.JacksonMapper;
+import org.apache.rocketmq.schema.registry.client.rest.RestService;
+import org.apache.rocketmq.schema.registry.common.dto.SchemaDto;
+import org.apache.rocketmq.schema.registry.common.dto.SchemaRecordDto;
+
+import java.io.IOException;
+
+public class NormalSchemaRegistryClient implements SchemaRegistryClient{
+
+    private final RestService restService;
+
+    public NormalSchemaRegistryClient(RestService restService) {
+        this.restService = restService;
+    }
+
+    @Override
+    public SchemaDto registerSchema(String clusterName, String tenant, String subjectName,
+                                    String schemaName, SchemaDto schemaDto) throws IOException, RestClientException {
+        return restService.registerSchema(clusterName, tenant, subjectName, schemaName, schemaDto);
+    }
+
+    @Override
+    public SchemaDto deleteSchema(String tenant, String schemaName) throws IOException, RestClientException {
+        return restService.deleteSchema(tenant, schemaName);
+    }
+
+    @Override
+    public SchemaDto updateSchema(String cluster, String tenant, String subjectName,
+                                  String schemaName, SchemaDto schemaDto) throws IOException, RestClientException {
+        return restService.updateSchema(cluster, tenant, subjectName, schemaName, schemaDto);
+    }
+
+    @Override
+    public SchemaDto getSchema(String cluster, String tenant,
+                               String subject, String schemaName) throws IOException, RestClientException {
+        return restService.getSchema(cluster, tenant, subject, schemaName);
+    }
+
+    @Override
+    public SchemaRecordDto getSchemaBySubject(String cluster, String subject) throws IOException, RestClientException {
+        return restService.getSchemaBySubject(cluster, subject);
+    }
+
+}
diff --git a/client/src/main/java/org/apache/rocketmq/schema/registry/client/SchemaRegistryClient.java b/client/src/main/java/org/apache/rocketmq/schema/registry/client/SchemaRegistryClient.java
new file mode 100644
index 0000000..343f0d8
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/schema/registry/client/SchemaRegistryClient.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.client;
+
+import org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
+import org.apache.rocketmq.schema.registry.common.dto.SchemaDto;
+import org.apache.rocketmq.schema.registry.common.dto.SchemaRecordDto;
+
+import java.io.IOException;
+
+public interface SchemaRegistryClient {
+
+    default SchemaDto registerSchema(String clusterName, String subjectName, String schemaName, SchemaDto schemaDto) throws IOException, RestClientException {
+        return registerSchema(clusterName, "default", subjectName, schemaName, schemaDto);
+    }
+
+    SchemaDto registerSchema(String clusterName, String tenant, String subjectName, String schemaName, SchemaDto schemaDto) throws IOException, RestClientException;
+
+
+    SchemaDto deleteSchema(String tenant, String schemaName) throws IOException, RestClientException;
+
+    default SchemaDto updateSchema(String cluster, String subjectName, String schemaName, SchemaDto schemaDto) throws IOException, RestClientException {
+        return updateSchema(cluster, "default", subjectName, schemaName, schemaDto);
+    }
+
+    SchemaDto updateSchema(String cluster, String tenant, String subjectName, String schemaName, SchemaDto schemaDto) throws IOException, RestClientException;
+
+    SchemaDto getSchema(String cluster, String tenant, String subject, String schemaName) throws IOException, RestClientException;
+
+    default SchemaRecordDto getSchemaBySubject(String subject) throws IOException, RestClientException {
+        return getSchemaBySubject("default", subject);
+    }
+
+    SchemaRecordDto getSchemaBySubject(String cluster, String subject) throws IOException, RestClientException;
+
+    default SchemaRecordDto getSchemaById(long schemaId) {
+        throw new UnsupportedOperationException();
+    }
+
+    default SchemaRecordDto getSchemaBySubjectAndId(String subject, long schemaId) {
+        throw new UnsupportedOperationException();
+    }
+
+}
diff --git a/client/src/main/java/org/apache/rocketmq/schema/registry/client/exceptions/RestClientException.java b/client/src/main/java/org/apache/rocketmq/schema/registry/client/exceptions/RestClientException.java
new file mode 100644
index 0000000..1d40703
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/schema/registry/client/exceptions/RestClientException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.client.exceptions;
+
+public class RestClientException extends Exception {
+
+  private final int status;
+  private final int errorCode;
+
+  public RestClientException(final String message, final int status, final int errorCode) {
+    super(message + "; error code: " + errorCode);
+    this.status = status;
+    this.errorCode = errorCode;
+  }
+
+  public int getStatus() {
+    return status;
+  }
+
+  public int getErrorCode() {
+    return errorCode;
+  }
+}
\ No newline at end of file
diff --git a/client/src/main/java/org/apache/rocketmq/schema/registry/client/rest/JacksonMapper.java b/client/src/main/java/org/apache/rocketmq/schema/registry/client/rest/JacksonMapper.java
new file mode 100644
index 0000000..40accdf
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/schema/registry/client/rest/JacksonMapper.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.client.rest;
+
+import com.fasterxml.jackson.core.json.JsonReadFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+
+public class JacksonMapper {
+    public static final ObjectMapper INSTANCE = JsonMapper.builder().
+            enable(JsonReadFeature.ALLOW_NON_NUMERIC_NUMBERS).build();
+}
diff --git a/client/src/main/java/org/apache/rocketmq/schema/registry/client/rest/RestService.java b/client/src/main/java/org/apache/rocketmq/schema/registry/client/rest/RestService.java
new file mode 100644
index 0000000..42d45ba
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/schema/registry/client/rest/RestService.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.client.rest;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
+import org.apache.rocketmq.schema.registry.client.util.HttpUtil;
+import org.apache.rocketmq.schema.registry.common.dto.SchemaDto;
+import org.apache.rocketmq.schema.registry.common.dto.SchemaRecordDto;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class RestService {
+    private static final TypeReference<SchemaDto> SCHEMA_DTO_TYPE_REFERENCE =
+            new TypeReference<SchemaDto>() {
+            };
+    private static final TypeReference<SchemaRecordDto> SCHEMA_RECORD_DTO_TYPE_REFERENCE =
+            new TypeReference<SchemaRecordDto>() {
+            };
+
+    public static ObjectMapper jsonParser = JacksonMapper.INSTANCE;
+
+    private static final String HTTP_GET = "GET";
+    private static final String HTTP_POST = "POST";
+    private static final String HTTP_PUT = "PUT";
+    private static final String HTTP_DELETE = "DELETE";
+
+    private final String baseUri;
+    private final Map<String, String> httpHeaders;
+
+    public RestService(String baseUri) {
+        this.baseUri = baseUri;
+        httpHeaders = new HashMap<>();
+        httpHeaders.put("Content-Type", "application/json");
+    }
+
+    public RestService(String baseUri, Map<String, String> httpHeaders) {
+        this.baseUri = baseUri;
+        this.httpHeaders = httpHeaders;
+    }
+
+    public SchemaDto registerSchema(String clusterName, String tenant, String subjectName,
+                                    String schemaName, SchemaDto schemaDto) throws IOException, RestClientException {
+        UrlBuilder urlBuilder = UrlBuilder.fromPath("/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema/{schema-name}");
+        String path = baseUri + urlBuilder.build(clusterName, tenant, subjectName, schemaName).toString();
+        String data = jsonParser.writeValueAsString(schemaDto);
+        return HttpUtil.sendHttpRequest(path, HTTP_POST, data, httpHeaders, SCHEMA_DTO_TYPE_REFERENCE);
+    }
+
+    public SchemaDto deleteSchema(String tenant, String schemaName) throws IOException, RestClientException {
+        UrlBuilder urlBuilder = UrlBuilder.fromPath("/tenant/{tenant-name}/schema/{schema-name}");
+        String path = baseUri + urlBuilder.build(tenant, schemaName).toString();
+        return HttpUtil.sendHttpRequest(path, HTTP_DELETE, null, httpHeaders, SCHEMA_DTO_TYPE_REFERENCE);
+    }
+
+    public SchemaDto updateSchema(String cluster, String tenant, String subject,
+                                  String schemaName, SchemaDto schemaDto) throws IOException, RestClientException {
+        UrlBuilder urlBuilder = UrlBuilder.fromPath("/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema/{schema-name}");
+        String path = baseUri + urlBuilder.build(cluster, tenant, subject, schemaName).toString();
+        String data = jsonParser.writeValueAsString(schemaDto);
+        return HttpUtil.sendHttpRequest(path, HTTP_PUT, data, httpHeaders, SCHEMA_DTO_TYPE_REFERENCE);
+    }
+
+    public SchemaDto getSchema(String cluster, String tenant,
+                               String subject, String schemaName) throws IOException, RestClientException {
+        UrlBuilder urlBuilder = UrlBuilder.fromPath("/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema/{schema-name}");
+        String path = baseUri + urlBuilder.build(cluster, tenant, subject, schemaName).toString();
+        return HttpUtil.sendHttpRequest(path, HTTP_GET , null, httpHeaders, SCHEMA_DTO_TYPE_REFERENCE);
+    }
+
+    public SchemaRecordDto getSchemaBySubject(String cluster, String subject) throws IOException, RestClientException {
+        UrlBuilder urlBuilder = UrlBuilder.fromPath("/cluster/{cluster-name}/subject/{subject-name}");
+        String path = baseUri + urlBuilder.build(cluster, subject).toString();
+        return HttpUtil.sendHttpRequest(path, HTTP_GET, null, httpHeaders, SCHEMA_RECORD_DTO_TYPE_REFERENCE);
+    }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/schema/registry/client/rest/UrlBuilder.java b/client/src/main/java/org/apache/rocketmq/schema/registry/client/rest/UrlBuilder.java
new file mode 100644
index 0000000..dd22f3e
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/schema/registry/client/rest/UrlBuilder.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.client.rest;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URLEncoder;
+import java.nio.charset.Charset;
+import java.util.*;
+import java.util.stream.Collectors;
+
+public class UrlBuilder {
+    static class UriPercentEncoder {
+        static final String CHARS_UNENCODE;
+        private static final BitSet UNENCODE;
+
+        static {
+            // 2.2. General delimiters
+            String gendelims = "@:";
+            // 2.2. Subdelimiters
+            String subdelims = "!$&\'()*+,;=";
+            // 2.3. Unreserved Characters
+            String alpha = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
+            String unreserved = "-._~";
+
+            CHARS_UNENCODE = alpha + unreserved + gendelims + subdelims;
+
+            BitSet unencode = new BitSet(256);
+            for (int i = 0; i < CHARS_UNENCODE.length(); i++) {
+                unencode.set(CHARS_UNENCODE.charAt(i));
+            }
+            UNENCODE = unencode;
+        }
+
+        static String encode(String value, Charset charset) {
+            StringBuilder sb = new StringBuilder(value.length() * 2);
+            for (int i = 0; i < value.length(); i++) {
+                char c = value.charAt(i);
+                if (UNENCODE.get(c & 0xFF)) {
+                    sb.append(c);
+                } else {
+                    String hex = Integer.toHexString(c).toUpperCase();
+                    if (hex.length() == 1) {
+                        sb.append("%0").append(hex);
+                    } else {
+                        sb.append('%').append(hex);
+                    }
+                }
+            }
+            return sb.toString();
+        }
+
+    }
+
+    private final String templatePath;
+
+    private final StringBuilder queryParamString = new StringBuilder();
+    private final List<String> templateNames;
+
+    public UrlBuilder(String templatePath) {
+        this.templatePath = Objects.requireNonNull(templatePath);
+        this.templateNames = findNamesInTemplate(templatePath);
+    }
+
+    public static UrlBuilder fromPath(String path) {
+        return new UrlBuilder(path);
+    }
+
+    public URI build(Object... templatePathValues) {
+
+        List<String> templateValues = Arrays.stream(templatePathValues)
+                .map(o -> UriPercentEncoder.encode(String.valueOf(o), Charset.defaultCharset()))
+                .collect(Collectors.toList());
+        if (templateValues.size() != this.templateNames.size()) {
+            throw new IllegalArgumentException("Mismatched number of template variable names: expected "
+                    + this.templateNames.size() + ", got " + templateValues.size());
+        }
+
+        String encodedPath = templatePath;
+        for (int i = 0; i < templateNames.size(); i++) {
+            encodedPath = encodedPath.replace(templateNames.get(i), templateValues.get(i));
+        }
+
+        if (queryParamString.length() > 0) {
+            if (encodedPath.indexOf('?') < 0) {
+                encodedPath += '?';
+            }
+            encodedPath += queryParamString;
+        }
+
+        try {
+            return new URI(encodedPath);
+        } catch (URISyntaxException e) {
+            throw new IllegalArgumentException(e);
+        }
+    }
+
+    public UrlBuilder queryParam(String paramName, String paramValue) {
+        if (queryParamString.length() > 0) {
+            queryParamString.append('&');
+        }
+        try {
+            queryParamString.append(encodeQueryParameter(paramName)).append('=')
+                    .append(encodeQueryParameter(paramValue));
+        } catch (UnsupportedEncodingException e) {
+            throw new IllegalArgumentException(e);
+        }
+        return this;
+    }
+
+    public UrlBuilder queryParam(String paramName, Integer paramValue) {
+        queryParam(paramName, String.valueOf(paramValue));
+        return this;
+    }
+
+    public UrlBuilder queryParam(String paramName, boolean paramValue) {
+        queryParam(paramName, Boolean.toString(paramValue));
+        return this;
+    }
+
+    @Override
+    public String toString() {
+        return templatePath;
+    }
+
+    static String encodeQueryParameter(String paramValue) throws UnsupportedEncodingException {
+        return URLEncoder.encode(paramValue, "UTF-8")
+                /*
+                 * use percent-encoding which is supported everywhere as per RFC-3986, not
+                 * legacy RFC-1866
+                 */
+                .replace("+", "%20");
+    }
+
+    static final List<String> findNamesInTemplate(String path) {
+        int counter = 0;
+        List<String> templateNames = new ArrayList<>();
+        StringBuilder sb = null;
+        while (counter < path.length()) {
+            char c = path.charAt(counter++);
+            if (c == '{' && sb == null) {
+                sb = new StringBuilder();
+            }
+            if (sb != null) {
+                sb.append(c);
+            }
+            if (c == '}' && sb != null) {
+                templateNames.add(sb.toString());
+                sb = null;
+            }
+        }
+        return Collections.unmodifiableList(templateNames);
+    }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/schema/registry/client/util/HttpUtil.java b/client/src/main/java/org/apache/rocketmq/schema/registry/client/util/HttpUtil.java
new file mode 100644
index 0000000..9ae4495
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/schema/registry/client/util/HttpUtil.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.client.util;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
+import org.apache.rocketmq.schema.registry.client.rest.JacksonMapper;
+
+import java.io.*;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.Map;
+
+public class HttpUtil {
+    public static ObjectMapper jsonParser = JacksonMapper.INSTANCE;
+    private static final int HTTP_CONNECT_TIMEOUT_MS = 30000;
+    private static final int HTTP_READ_TIMEOUT_MS = 30000;
+    private static final int ERROR_CODE = 5001;
+
+    public static <T> T sendHttpRequest(String requestUrl, String method, String requestBodyData,
+                                        Map<String, String> requestProperties,
+                                        TypeReference<T> responseFormat)
+            throws IOException, RestClientException {
+
+        HttpURLConnection connection = null;
+        try {
+            URL url = new URL(requestUrl);
+
+            connection = buildConnection(url, method, requestProperties);
+
+            if (requestBodyData != null) {
+                connection.setDoOutput(true);
+                try (DataOutputStream os = new DataOutputStream(connection.getOutputStream())) {
+                    os.writeBytes(requestBodyData);
+                    os.flush();
+                } catch (IOException e) {
+                    throw e;
+                }
+            }
+
+            int responseCode = connection.getResponseCode();
+            if (responseCode == HttpURLConnection.HTTP_OK || responseCode == HttpURLConnection.HTTP_CREATED) {
+                InputStream is = connection.getInputStream();
+                T result = jsonParser.readValue(is, responseFormat);
+                is.close();
+                return result;
+            } else if (responseCode == HttpURLConnection.HTTP_NO_CONTENT) {
+                return null;
+            } else {
+                throw new RestClientException("send request failed", responseCode,
+                        ERROR_CODE);
+            }
+
+        } finally {
+            if (connection != null) {
+                connection.disconnect();
+            }
+        }
+    }
+
+    private static HttpURLConnection buildConnection(URL url, String method, Map<String,
+            String> requestProperties)
+            throws IOException {
+        HttpURLConnection connection = null;
+        connection = (HttpURLConnection) url.openConnection();
+
+        connection.setConnectTimeout(HTTP_CONNECT_TIMEOUT_MS);
+        connection.setReadTimeout(HTTP_READ_TIMEOUT_MS);
+
+        connection.setRequestMethod(method);
+        connection.setDoInput(true);
+
+        for (Map.Entry<String, String> entry : requestProperties.entrySet()) {
+            connection.setRequestProperty(entry.getKey(), entry.getValue());
+        }
+
+        connection.setUseCaches(false);
+
+        return connection;
+    }
+}
diff --git a/pom.xml b/pom.xml
index 425de31..f854f7d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -21,6 +21,7 @@
         <module>core</module>
         <module>storage-rocketmq</module>
         <module>war</module>
+        <module>client</module>
     </modules>
 
     <properties>