You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by du...@apache.org on 2022/07/31 08:39:21 UTC
[rocketmq-schema-registry] 21/37: add http client for rocketmq schema registry
This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-schema-registry.git
commit 3c51ffff7e93fa3ef76d1dd511e9b8f5d7e2a6ba
Author: hankunming <ha...@xiaomi.com>
AuthorDate: Wed Jul 20 21:03:19 2022 +0800
add http client for rocketmq schema registry
---
client/pom.xml | 27 ++++
.../client/NormalSchemaRegistryClient.java | 65 ++++++++
.../registry/client/SchemaRegistryClient.java | 59 +++++++
.../client/exceptions/RestClientException.java | 38 +++++
.../schema/registry/client/rest/JacksonMapper.java | 27 ++++
.../schema/registry/client/rest/RestService.java | 96 ++++++++++++
.../schema/registry/client/rest/UrlBuilder.java | 170 +++++++++++++++++++++
.../schema/registry/client/util/HttpUtil.java | 97 ++++++++++++
pom.xml | 1 +
9 files changed, 580 insertions(+)
diff --git a/client/pom.xml b/client/pom.xml
new file mode 100644
index 0000000..2ee7881
--- /dev/null
+++ b/client/pom.xml
@@ -0,0 +1,27 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>rocketmq-schema-registry-all</artifactId>
+ <groupId>org.apache.rocketmq</groupId>
+ <version>0.0.2-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>client</artifactId>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.rocketmq</groupId>
+ <artifactId>common</artifactId>
+ <version>0.0.2-SNAPSHOT</version>
+ <scope>compile</scope>
+ </dependency>
+ </dependencies>
+
+ <properties>
+ <maven.compiler.source>8</maven.compiler.source>
+ <maven.compiler.target>8</maven.compiler.target>
+ </properties>
+
+</project>
\ No newline at end of file
diff --git a/client/src/main/java/org/apache/rocketmq/schema/registry/client/NormalSchemaRegistryClient.java b/client/src/main/java/org/apache/rocketmq/schema/registry/client/NormalSchemaRegistryClient.java
new file mode 100644
index 0000000..522b6a2
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/schema/registry/client/NormalSchemaRegistryClient.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.client;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
+import org.apache.rocketmq.schema.registry.client.rest.JacksonMapper;
+import org.apache.rocketmq.schema.registry.client.rest.RestService;
+import org.apache.rocketmq.schema.registry.common.dto.SchemaDto;
+import org.apache.rocketmq.schema.registry.common.dto.SchemaRecordDto;
+
+import java.io.IOException;
+
+public class NormalSchemaRegistryClient implements SchemaRegistryClient{
+
+ private final RestService restService;
+
+ public NormalSchemaRegistryClient(RestService restService) {
+ this.restService = restService;
+ }
+
+ @Override
+ public SchemaDto registerSchema(String clusterName, String tenant, String subjectName,
+ String schemaName, SchemaDto schemaDto) throws IOException, RestClientException {
+ return restService.registerSchema(clusterName, tenant, subjectName, schemaName, schemaDto);
+ }
+
+ @Override
+ public SchemaDto deleteSchema(String tenant, String schemaName) throws IOException, RestClientException {
+ return restService.deleteSchema(tenant, schemaName);
+ }
+
+ @Override
+ public SchemaDto updateSchema(String cluster, String tenant, String subjectName,
+ String schemaName, SchemaDto schemaDto) throws IOException, RestClientException {
+ return restService.updateSchema(cluster, tenant, subjectName, schemaName, schemaDto);
+ }
+
+ @Override
+ public SchemaDto getSchema(String cluster, String tenant,
+ String subject, String schemaName) throws IOException, RestClientException {
+ return restService.getSchema(cluster, tenant, subject, schemaName);
+ }
+
+ @Override
+ public SchemaRecordDto getSchemaBySubject(String cluster, String subject) throws IOException, RestClientException {
+ return restService.getSchemaBySubject(cluster, subject);
+ }
+
+}
diff --git a/client/src/main/java/org/apache/rocketmq/schema/registry/client/SchemaRegistryClient.java b/client/src/main/java/org/apache/rocketmq/schema/registry/client/SchemaRegistryClient.java
new file mode 100644
index 0000000..343f0d8
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/schema/registry/client/SchemaRegistryClient.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.client;
+
+import org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
+import org.apache.rocketmq.schema.registry.common.dto.SchemaDto;
+import org.apache.rocketmq.schema.registry.common.dto.SchemaRecordDto;
+
+import java.io.IOException;
+
+public interface SchemaRegistryClient {
+
+ default SchemaDto registerSchema(String clusterName, String subjectName, String schemaName, SchemaDto schemaDto) throws IOException, RestClientException {
+ return registerSchema(clusterName, "default", subjectName, schemaName, schemaDto);
+ }
+
+ SchemaDto registerSchema(String clusterName, String tenant, String subjectName, String schemaName, SchemaDto schemaDto) throws IOException, RestClientException;
+
+
+ SchemaDto deleteSchema(String tenant, String schemaName) throws IOException, RestClientException;
+
+ default SchemaDto updateSchema(String cluster, String subjectName, String schemaName, SchemaDto schemaDto) throws IOException, RestClientException {
+ return updateSchema(cluster, "default", subjectName, schemaName, schemaDto);
+ }
+
+ SchemaDto updateSchema(String cluster, String tenant, String subjectName, String schemaName, SchemaDto schemaDto) throws IOException, RestClientException;
+
+ SchemaDto getSchema(String cluster, String tenant, String subject, String schemaName) throws IOException, RestClientException;
+
+ default SchemaRecordDto getSchemaBySubject(String subject) throws IOException, RestClientException {
+ return getSchemaBySubject("default", subject);
+ }
+
+ SchemaRecordDto getSchemaBySubject(String cluster, String subject) throws IOException, RestClientException;
+
+ default SchemaRecordDto getSchemaById(long schemaId) {
+ throw new UnsupportedOperationException();
+ }
+
+ default SchemaRecordDto getSchemaBySubjectAndId(String subject, long schemaId) {
+ throw new UnsupportedOperationException();
+ }
+
+}
diff --git a/client/src/main/java/org/apache/rocketmq/schema/registry/client/exceptions/RestClientException.java b/client/src/main/java/org/apache/rocketmq/schema/registry/client/exceptions/RestClientException.java
new file mode 100644
index 0000000..1d40703
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/schema/registry/client/exceptions/RestClientException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.client.exceptions;
+
+public class RestClientException extends Exception {
+
+ private final int status;
+ private final int errorCode;
+
+ public RestClientException(final String message, final int status, final int errorCode) {
+ super(message + "; error code: " + errorCode);
+ this.status = status;
+ this.errorCode = errorCode;
+ }
+
+ public int getStatus() {
+ return status;
+ }
+
+ public int getErrorCode() {
+ return errorCode;
+ }
+}
\ No newline at end of file
diff --git a/client/src/main/java/org/apache/rocketmq/schema/registry/client/rest/JacksonMapper.java b/client/src/main/java/org/apache/rocketmq/schema/registry/client/rest/JacksonMapper.java
new file mode 100644
index 0000000..40accdf
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/schema/registry/client/rest/JacksonMapper.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.client.rest;
+
+import com.fasterxml.jackson.core.json.JsonReadFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.json.JsonMapper;
+
+public class JacksonMapper {
+ public static final ObjectMapper INSTANCE = JsonMapper.builder().
+ enable(JsonReadFeature.ALLOW_NON_NUMERIC_NUMBERS).build();
+}
diff --git a/client/src/main/java/org/apache/rocketmq/schema/registry/client/rest/RestService.java b/client/src/main/java/org/apache/rocketmq/schema/registry/client/rest/RestService.java
new file mode 100644
index 0000000..42d45ba
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/schema/registry/client/rest/RestService.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.client.rest;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
+import org.apache.rocketmq.schema.registry.client.util.HttpUtil;
+import org.apache.rocketmq.schema.registry.common.dto.SchemaDto;
+import org.apache.rocketmq.schema.registry.common.dto.SchemaRecordDto;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class RestService {
+ private static final TypeReference<SchemaDto> SCHEMA_DTO_TYPE_REFERENCE =
+ new TypeReference<SchemaDto>() {
+ };
+ private static final TypeReference<SchemaRecordDto> SCHEMA_RECORD_DTO_TYPE_REFERENCE =
+ new TypeReference<SchemaRecordDto>() {
+ };
+
+ public static ObjectMapper jsonParser = JacksonMapper.INSTANCE;
+
+ private static final String HTTP_GET = "GET";
+ private static final String HTTP_POST = "POST";
+ private static final String HTTP_PUT = "PUT";
+ private static final String HTTP_DELETE = "DELETE";
+
+ private final String baseUri;
+ private final Map<String, String> httpHeaders;
+
+ public RestService(String baseUri) {
+ this.baseUri = baseUri;
+ httpHeaders = new HashMap<>();
+ httpHeaders.put("Content-Type", "application/json");
+ }
+
+ public RestService(String baseUri, Map<String, String> httpHeaders) {
+ this.baseUri = baseUri;
+ this.httpHeaders = httpHeaders;
+ }
+
+ public SchemaDto registerSchema(String clusterName, String tenant, String subjectName,
+ String schemaName, SchemaDto schemaDto) throws IOException, RestClientException {
+ UrlBuilder urlBuilder = UrlBuilder.fromPath("/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema/{schema-name}");
+ String path = baseUri + urlBuilder.build(clusterName, tenant, subjectName, schemaName).toString();
+ String data = jsonParser.writeValueAsString(schemaDto);
+ return HttpUtil.sendHttpRequest(path, HTTP_POST, data, httpHeaders, SCHEMA_DTO_TYPE_REFERENCE);
+ }
+
+ public SchemaDto deleteSchema(String tenant, String schemaName) throws IOException, RestClientException {
+ UrlBuilder urlBuilder = UrlBuilder.fromPath("/tenant/{tenant-name}/schema/{schema-name}");
+ String path = baseUri + urlBuilder.build(tenant, schemaName).toString();
+ return HttpUtil.sendHttpRequest(path, HTTP_DELETE, null, httpHeaders, SCHEMA_DTO_TYPE_REFERENCE);
+ }
+
+ public SchemaDto updateSchema(String cluster, String tenant, String subject,
+ String schemaName, SchemaDto schemaDto) throws IOException, RestClientException {
+ UrlBuilder urlBuilder = UrlBuilder.fromPath("/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema/{schema-name}");
+ String path = baseUri + urlBuilder.build(cluster, tenant, subject, schemaName).toString();
+ String data = jsonParser.writeValueAsString(schemaDto);
+ return HttpUtil.sendHttpRequest(path, HTTP_PUT, data, httpHeaders, SCHEMA_DTO_TYPE_REFERENCE);
+ }
+
+ public SchemaDto getSchema(String cluster, String tenant,
+ String subject, String schemaName) throws IOException, RestClientException {
+ UrlBuilder urlBuilder = UrlBuilder.fromPath("/cluster/{cluster-name}/tenant/{tenant-name}/subject/{subject-name}/schema/{schema-name}");
+ String path = baseUri + urlBuilder.build(cluster, tenant, subject, schemaName).toString();
+ return HttpUtil.sendHttpRequest(path, HTTP_GET , null, httpHeaders, SCHEMA_DTO_TYPE_REFERENCE);
+ }
+
+ public SchemaRecordDto getSchemaBySubject(String cluster, String subject) throws IOException, RestClientException {
+ UrlBuilder urlBuilder = UrlBuilder.fromPath("/cluster/{cluster-name}/subject/{subject-name}");
+ String path = baseUri + urlBuilder.build(cluster, subject).toString();
+ return HttpUtil.sendHttpRequest(path, HTTP_GET, null, httpHeaders, SCHEMA_RECORD_DTO_TYPE_REFERENCE);
+ }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/schema/registry/client/rest/UrlBuilder.java b/client/src/main/java/org/apache/rocketmq/schema/registry/client/rest/UrlBuilder.java
new file mode 100644
index 0000000..dd22f3e
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/schema/registry/client/rest/UrlBuilder.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.client.rest;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URLEncoder;
+import java.nio.charset.Charset;
+import java.util.*;
+import java.util.stream.Collectors;
+
+public class UrlBuilder {
+ static class UriPercentEncoder {
+ static final String CHARS_UNENCODE;
+ private static final BitSet UNENCODE;
+
+ static {
+ // 2.2. General delimiters
+ String gendelims = "@:";
+ // 2.2. Subdelimiters
+ String subdelims = "!$&\'()*+,;=";
+ // 2.3. Unreserved Characters
+ String alpha = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
+ String unreserved = "-._~";
+
+ CHARS_UNENCODE = alpha + unreserved + gendelims + subdelims;
+
+ BitSet unencode = new BitSet(256);
+ for (int i = 0; i < CHARS_UNENCODE.length(); i++) {
+ unencode.set(CHARS_UNENCODE.charAt(i));
+ }
+ UNENCODE = unencode;
+ }
+
+ static String encode(String value, Charset charset) {
+ StringBuilder sb = new StringBuilder(value.length() * 2);
+ for (int i = 0; i < value.length(); i++) {
+ char c = value.charAt(i);
+ if (UNENCODE.get(c & 0xFF)) {
+ sb.append(c);
+ } else {
+ String hex = Integer.toHexString(c).toUpperCase();
+ if (hex.length() == 1) {
+ sb.append("%0").append(hex);
+ } else {
+ sb.append('%').append(hex);
+ }
+ }
+ }
+ return sb.toString();
+ }
+
+ }
+
+ private final String templatePath;
+
+ private final StringBuilder queryParamString = new StringBuilder();
+ private final List<String> templateNames;
+
+ public UrlBuilder(String templatePath) {
+ this.templatePath = Objects.requireNonNull(templatePath);
+ this.templateNames = findNamesInTemplate(templatePath);
+ }
+
+ public static UrlBuilder fromPath(String path) {
+ return new UrlBuilder(path);
+ }
+
+ public URI build(Object... templatePathValues) {
+
+ List<String> templateValues = Arrays.stream(templatePathValues)
+ .map(o -> UriPercentEncoder.encode(String.valueOf(o), Charset.defaultCharset()))
+ .collect(Collectors.toList());
+ if (templateValues.size() != this.templateNames.size()) {
+ throw new IllegalArgumentException("Mismatched number of template variable names: expected "
+ + this.templateNames.size() + ", got " + templateValues.size());
+ }
+
+ String encodedPath = templatePath;
+ for (int i = 0; i < templateNames.size(); i++) {
+ encodedPath = encodedPath.replace(templateNames.get(i), templateValues.get(i));
+ }
+
+ if (queryParamString.length() > 0) {
+ if (encodedPath.indexOf('?') < 0) {
+ encodedPath += '?';
+ }
+ encodedPath += queryParamString;
+ }
+
+ try {
+ return new URI(encodedPath);
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+
+ public UrlBuilder queryParam(String paramName, String paramValue) {
+ if (queryParamString.length() > 0) {
+ queryParamString.append('&');
+ }
+ try {
+ queryParamString.append(encodeQueryParameter(paramName)).append('=')
+ .append(encodeQueryParameter(paramValue));
+ } catch (UnsupportedEncodingException e) {
+ throw new IllegalArgumentException(e);
+ }
+ return this;
+ }
+
+ public UrlBuilder queryParam(String paramName, Integer paramValue) {
+ queryParam(paramName, String.valueOf(paramValue));
+ return this;
+ }
+
+ public UrlBuilder queryParam(String paramName, boolean paramValue) {
+ queryParam(paramName, Boolean.toString(paramValue));
+ return this;
+ }
+
+ @Override
+ public String toString() {
+ return templatePath;
+ }
+
+ static String encodeQueryParameter(String paramValue) throws UnsupportedEncodingException {
+ return URLEncoder.encode(paramValue, "UTF-8")
+ /*
+ * use percent-encoding which is supported everywhere as per RFC-3986, not
+ * legacy RFC-1866
+ */
+ .replace("+", "%20");
+ }
+
+ static final List<String> findNamesInTemplate(String path) {
+ int counter = 0;
+ List<String> templateNames = new ArrayList<>();
+ StringBuilder sb = null;
+ while (counter < path.length()) {
+ char c = path.charAt(counter++);
+ if (c == '{' && sb == null) {
+ sb = new StringBuilder();
+ }
+ if (sb != null) {
+ sb.append(c);
+ }
+ if (c == '}' && sb != null) {
+ templateNames.add(sb.toString());
+ sb = null;
+ }
+ }
+ return Collections.unmodifiableList(templateNames);
+ }
+}
diff --git a/client/src/main/java/org/apache/rocketmq/schema/registry/client/util/HttpUtil.java b/client/src/main/java/org/apache/rocketmq/schema/registry/client/util/HttpUtil.java
new file mode 100644
index 0000000..9ae4495
--- /dev/null
+++ b/client/src/main/java/org/apache/rocketmq/schema/registry/client/util/HttpUtil.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.schema.registry.client.util;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.rocketmq.schema.registry.client.exceptions.RestClientException;
+import org.apache.rocketmq.schema.registry.client.rest.JacksonMapper;
+
+import java.io.*;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.Map;
+
+public class HttpUtil {
+ public static ObjectMapper jsonParser = JacksonMapper.INSTANCE;
+ private static final int HTTP_CONNECT_TIMEOUT_MS = 30000;
+ private static final int HTTP_READ_TIMEOUT_MS = 30000;
+ private static final int ERROR_CODE = 5001;
+
+ public static <T> T sendHttpRequest(String requestUrl, String method, String requestBodyData,
+ Map<String, String> requestProperties,
+ TypeReference<T> responseFormat)
+ throws IOException, RestClientException {
+
+ HttpURLConnection connection = null;
+ try {
+ URL url = new URL(requestUrl);
+
+ connection = buildConnection(url, method, requestProperties);
+
+ if (requestBodyData != null) {
+ connection.setDoOutput(true);
+ try (DataOutputStream os = new DataOutputStream(connection.getOutputStream())) {
+ os.writeBytes(requestBodyData);
+ os.flush();
+ } catch (IOException e) {
+ throw e;
+ }
+ }
+
+ int responseCode = connection.getResponseCode();
+ if (responseCode == HttpURLConnection.HTTP_OK || responseCode == HttpURLConnection.HTTP_CREATED) {
+ InputStream is = connection.getInputStream();
+ T result = jsonParser.readValue(is, responseFormat);
+ is.close();
+ return result;
+ } else if (responseCode == HttpURLConnection.HTTP_NO_CONTENT) {
+ return null;
+ } else {
+ throw new RestClientException("send request failed", responseCode,
+ ERROR_CODE);
+ }
+
+ } finally {
+ if (connection != null) {
+ connection.disconnect();
+ }
+ }
+ }
+
+ private static HttpURLConnection buildConnection(URL url, String method, Map<String,
+ String> requestProperties)
+ throws IOException {
+ HttpURLConnection connection = null;
+ connection = (HttpURLConnection) url.openConnection();
+
+ connection.setConnectTimeout(HTTP_CONNECT_TIMEOUT_MS);
+ connection.setReadTimeout(HTTP_READ_TIMEOUT_MS);
+
+ connection.setRequestMethod(method);
+ connection.setDoInput(true);
+
+ for (Map.Entry<String, String> entry : requestProperties.entrySet()) {
+ connection.setRequestProperty(entry.getKey(), entry.getValue());
+ }
+
+ connection.setUseCaches(false);
+
+ return connection;
+ }
+}
diff --git a/pom.xml b/pom.xml
index 425de31..f854f7d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -21,6 +21,7 @@
<module>core</module>
<module>storage-rocketmq</module>
<module>war</module>
+ <module>client</module>
</modules>
<properties>