You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/18 07:43:10 UTC
[incubator-inlong] branch master updated: [INLONG-3202][Manager][SDK] Unify SortSourceConfig for manager and sdk (#3215)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 42f5d9e [INLONG-3202][Manager][SDK] Unify SortSourceConfig for manager and sdk (#3215)
42f5d9e is described below
commit 42f5d9ec1816e145bbe9049472ba8b7eb4878509
Author: vernedeng <de...@pku.edu.cn>
AuthorDate: Fri Mar 18 15:43:01 2022 +0800
[INLONG-3202][Manager][SDK] Unify SortSourceConfig for manager and sdk (#3215)
---
.../apache/inlong/common/pojo/sdk/CacheZone.java | 31 +++---
.../inlong/common/pojo/sdk/CacheZoneConfig.java | 27 +++--
.../common/pojo/sdk}/SortSourceConfigResponse.java | 39 ++-----
.../org/apache/inlong/common/pojo/sdk/Topic.java | 27 +++--
.../inlong/manager/service/core/SortService.java | 2 +-
.../manager/service/core/SortSourceService.java | 2 +-
.../manager/service/core/impl/SortServiceImpl.java | 8 +-
.../service/core/impl/SortSourceServiceImpl.java | 4 +-
.../service/core/impl/SortServiceImplTest.java | 2 +-
.../web/controller/openapi/SortController.java | 2 +-
.../apache/inlong/sdk/sort/entity/CacheZone.java | 113 ---------------------
.../inlong/sdk/sort/entity/CacheZoneConfig.java | 61 -----------
.../inlong/sdk/sort/entity/ManagerResponse.java | 70 -------------
.../org/apache/inlong/sdk/sort/entity/Topic.java | 79 --------------
.../sdk/sort/impl/QueryConsumeConfigImpl.java | 22 ++--
15 files changed, 70 insertions(+), 419 deletions(-)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortSourceService.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sdk/CacheZone.java
similarity index 63%
copy from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortSourceService.java
copy to inlong-common/src/main/java/org/apache/inlong/common/pojo/sdk/CacheZone.java
index e1c9304..1d5d171 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortSourceService.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sdk/CacheZone.java
@@ -15,22 +15,25 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.core;
+package org.apache.inlong.common.pojo.sdk;
-import org.apache.inlong.manager.common.pojo.sort.SortSourceConfigResponse.CacheZone;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import java.util.List;
import java.util.Map;
-/**
- * Sort source service.
- */
-public interface SortSourceService {
-
- /**
- * Get cache zones by cluster name and task name.
- * @param clusterName Target cluster name.
- * @param taskName Target task name.
- * @return SortSourceConfigResponse
- */
- Map<String, CacheZone> getCacheZones(String clusterName, String taskName);
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class CacheZone {
+ String zoneName;
+ String serviceUrl;
+ String authentication;
+ List<Topic> topics;
+ Map<String, String> cacheZoneProperties;
+ String zoneType;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortSourceService.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sdk/CacheZoneConfig.java
similarity index 63%
copy from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortSourceService.java
copy to inlong-common/src/main/java/org/apache/inlong/common/pojo/sdk/CacheZoneConfig.java
index e1c9304..decc811 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortSourceService.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sdk/CacheZoneConfig.java
@@ -15,22 +15,21 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.core;
+package org.apache.inlong.common.pojo.sdk;
-import org.apache.inlong.manager.common.pojo.sort.SortSourceConfigResponse.CacheZone;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
import java.util.Map;
-/**
- * Sort source service.
- */
-public interface SortSourceService {
-
- /**
- * Get cache zones by cluster name and task name.
- * @param clusterName Target cluster name.
- * @param taskName Target task name.
- * @return SortSourceConfigResponse
- */
- Map<String, CacheZone> getCacheZones(String clusterName, String taskName);
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class CacheZoneConfig {
+ String sortClusterName;
+ String sortTaskId;
+ Map<String, CacheZone> cacheZones;
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sort/SortSourceConfigResponse.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sdk/SortSourceConfigResponse.java
similarity index 54%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sort/SortSourceConfigResponse.java
rename to inlong-common/src/main/java/org/apache/inlong/common/pojo/sdk/SortSourceConfigResponse.java
index 4e28095..46f73cd 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/sort/SortSourceConfigResponse.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sdk/SortSourceConfigResponse.java
@@ -15,50 +15,23 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.common.pojo.sort;
+package org.apache.inlong.common.pojo.sdk;
-import io.swagger.annotations.ApiModel;
+import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
-
-import java.util.List;
-import java.util.Map;
+import lombok.NoArgsConstructor;
@Data
@Builder
-@ApiModel("Sort source sdk config")
+@AllArgsConstructor
+@NoArgsConstructor
public class SortSourceConfigResponse {
String msg;
int code;
String md5;
- SortSourceConfig data;
-
- @Data
- @Builder
- public static class SortSourceConfig {
- String sortClusterName;
- String sortTaskId;
- Map<String, CacheZone> cacheZones;
- }
-
- @Data
- @Builder
- public static class CacheZone {
- String zoneName;
- String serviceUrl;
- String authentication;
- List<Topic> topics;
- Map<String, String> cacheZoneProperties;
- String zoneType;
- }
+ CacheZoneConfig data;
- @Data
- @Builder
- public static class Topic {
- String topic;
- int partitionCnt;
- Map<String, String> topicProperties;
- }
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortSourceService.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sdk/Topic.java
similarity index 63%
copy from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortSourceService.java
copy to inlong-common/src/main/java/org/apache/inlong/common/pojo/sdk/Topic.java
index e1c9304..e337416 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortSourceService.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/pojo/sdk/Topic.java
@@ -15,22 +15,21 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.service.core;
+package org.apache.inlong.common.pojo.sdk;
-import org.apache.inlong.manager.common.pojo.sort.SortSourceConfigResponse.CacheZone;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
import java.util.Map;
-/**
- * Sort source service.
- */
-public interface SortSourceService {
-
- /**
- * Get cache zones by cluster name and task name.
- * @param clusterName Target cluster name.
- * @param taskName Target task name.
- * @return SortSourceConfigResponse
- */
- Map<String, CacheZone> getCacheZones(String clusterName, String taskName);
+@Data
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class Topic {
+ String topic;
+ int partitionCnt;
+ Map<String, String> topicProperties;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortService.java
index f5743af..92d3ff3 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortService.java
@@ -18,7 +18,7 @@
package org.apache.inlong.manager.service.core;
import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
-import org.apache.inlong.manager.common.pojo.sort.SortSourceConfigResponse;
+import org.apache.inlong.common.pojo.sdk.SortSourceConfigResponse;
/**
* Sort Service
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortSourceService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortSourceService.java
index e1c9304..b849f9d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortSourceService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/SortSourceService.java
@@ -17,7 +17,7 @@
package org.apache.inlong.manager.service.core;
-import org.apache.inlong.manager.common.pojo.sort.SortSourceConfigResponse.CacheZone;
+import org.apache.inlong.common.pojo.sdk.CacheZone;
import java.util.Map;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
index 08b6566..ce4c607 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortServiceImpl.java
@@ -19,11 +19,11 @@ package org.apache.inlong.manager.service.core.impl;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.common.pojo.sdk.CacheZone;
+import org.apache.inlong.common.pojo.sdk.CacheZoneConfig;
import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
import org.apache.inlong.common.pojo.sortstandalone.SortClusterConfig;
-import org.apache.inlong.manager.common.pojo.sort.SortSourceConfigResponse;
-import org.apache.inlong.manager.common.pojo.sort.SortSourceConfigResponse.CacheZone;
-import org.apache.inlong.manager.common.pojo.sort.SortSourceConfigResponse.SortSourceConfig;
+import org.apache.inlong.common.pojo.sdk.SortSourceConfigResponse;
import org.apache.inlong.manager.dao.entity.SortClusterConfigEntity;
import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
import org.apache.inlong.manager.service.core.SortClusterConfigService;
@@ -160,7 +160,7 @@ public class SortServiceImpl implements SortService {
.build();
}
- SortSourceConfig data = SortSourceConfig.builder()
+ CacheZoneConfig data = CacheZoneConfig.builder()
.sortClusterName(clusterName)
.sortTaskId(sortTaskId)
.cacheZones(cacheZones)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
index 27dcddf..0e5b07a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/SortSourceServiceImpl.java
@@ -19,8 +19,8 @@ package org.apache.inlong.manager.service.core.impl;
import com.google.gson.Gson;
import jodd.util.StringUtil;
-import org.apache.inlong.manager.common.pojo.sort.SortSourceConfigResponse.CacheZone;
-import org.apache.inlong.manager.common.pojo.sort.SortSourceConfigResponse.Topic;
+import org.apache.inlong.common.pojo.sdk.CacheZone;
+import org.apache.inlong.common.pojo.sdk.Topic;
import org.apache.inlong.manager.dao.entity.SortSourceConfigEntity;
import org.apache.inlong.manager.dao.mapper.SortSourceConfigEntityMapper;
import org.apache.inlong.manager.service.core.SortSourceService;
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/SortServiceImplTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/SortServiceImplTest.java
index 461a35b..9c3f0a3 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/SortServiceImplTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/SortServiceImplTest.java
@@ -17,7 +17,7 @@
package org.apache.inlong.manager.service.core.impl;
-import org.apache.inlong.manager.common.pojo.sort.SortSourceConfigResponse;
+import org.apache.inlong.common.pojo.sdk.SortSourceConfigResponse;
import org.apache.inlong.manager.dao.entity.SortSourceConfigEntity;
import org.apache.inlong.manager.dao.mapper.SortSourceConfigEntityMapper;
import org.apache.inlong.manager.service.ServiceBaseTest;
diff --git a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/SortController.java b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/SortController.java
index da2e55f..8797b49 100644
--- a/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/SortController.java
+++ b/inlong-manager/manager-web/src/main/java/org/apache/inlong/manager/web/controller/openapi/SortController.java
@@ -20,7 +20,7 @@ package org.apache.inlong.manager.web.controller.openapi;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.apache.inlong.common.pojo.sortstandalone.SortClusterResponse;
-import org.apache.inlong.manager.common.pojo.sort.SortSourceConfigResponse;
+import org.apache.inlong.common.pojo.sdk.SortSourceConfigResponse;
import org.apache.inlong.manager.service.core.SortService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/CacheZone.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/CacheZone.java
deleted file mode 100644
index 2ce74f9..0000000
--- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/CacheZone.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.sort.entity;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-
-public class CacheZone implements Serializable {
-
- //inlong zoneName
- private String zoneName;
- //mq serviceUrl
- private String serviceUrl;
- private String authentication;
- private List<Topic> topics;
- private Map<String, String> cacheZoneProperties;
- // pulsar,kafka,tube
- private String zoneType;
-
- public String getZoneName() {
- return zoneName;
- }
-
- public void setZoneName(String zoneName) {
- this.zoneName = zoneName;
- }
-
- public String getServiceUrl() {
- return serviceUrl;
- }
-
- public void setServiceUrl(String serviceUrl) {
- this.serviceUrl = serviceUrl;
- }
-
- public String getAuthentication() {
- return authentication;
- }
-
- public void setAuthentication(String authentication) {
- this.authentication = authentication;
- }
-
- public Map<String, String> getCacheZoneProperties() {
- return cacheZoneProperties;
- }
-
- public void setCacheZoneProperties(Map<String, String> cacheZoneProperties) {
- this.cacheZoneProperties = cacheZoneProperties;
- }
-
- public List<Topic> getTopics() {
- return topics;
- }
-
- public void setTopics(List<Topic> topics) {
- this.topics = topics;
- }
-
- public String getZoneType() {
- return zoneType;
- }
-
- public void setZoneType(String zoneType) {
- this.zoneType = zoneType;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- CacheZone cacheZone = (CacheZone) o;
- return zoneName.equals(cacheZone.zoneName);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(zoneName);
- }
-
- @Override
- public String toString() {
- return "CacheZone{"
- + "zoneName='" + zoneName
- + ", serviceUrl='" + serviceUrl
- + ", authentication='" + authentication
- + ", topics=" + topics
- + ", cacheZoneProperties=" + cacheZoneProperties
- + ", zoneType='" + zoneType
- + '}';
- }
-}
diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/CacheZoneConfig.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/CacheZoneConfig.java
deleted file mode 100644
index 97e3b83..0000000
--- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/CacheZoneConfig.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.sort.entity;
-
-import java.io.Serializable;
-import java.util.Map;
-
-public class CacheZoneConfig implements Serializable {
-
- private String sortClusterName;
- private String sortTaskId;
- private Map<String, CacheZone> cacheZones;
-
- public String getSortClusterName() {
- return sortClusterName;
- }
-
- public void setSortClusterName(String sortClusterName) {
- this.sortClusterName = sortClusterName;
- }
-
- public String getSortTaskId() {
- return sortTaskId;
- }
-
- public void setSortTaskId(String sortTaskId) {
- this.sortTaskId = sortTaskId;
- }
-
- public Map<String, CacheZone> getCacheZones() {
- return cacheZones;
- }
-
- public void setCacheZones(Map<String, CacheZone> cacheZones) {
- this.cacheZones = cacheZones;
- }
-
- @Override
- public String toString() {
- return "SortSourceConfig{"
- + "sortClusterName='" + sortClusterName + '\''
- + ", sortTaskId='" + sortTaskId + '\''
- + ", cacheZones=" + cacheZones
- + '}';
- }
-}
diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/ManagerResponse.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/ManagerResponse.java
deleted file mode 100644
index a125fe0..0000000
--- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/ManagerResponse.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.sort.entity;
-
-import java.io.Serializable;
-
-public class ManagerResponse implements Serializable {
-
- private boolean result;
- private int errCode;
- private String md5;
- private CacheZoneConfig data;
-
- public boolean isResult() {
- return result;
- }
-
- public void setResult(boolean result) {
- this.result = result;
- }
-
- public int getErrCode() {
- return errCode;
- }
-
- public void setErrCode(int errCode) {
- this.errCode = errCode;
- }
-
- public String getMd5() {
- return md5;
- }
-
- public void setMd5(String md5) {
- this.md5 = md5;
- }
-
- public CacheZoneConfig getData() {
- return data;
- }
-
- public void setData(CacheZoneConfig data) {
- this.data = data;
- }
-
- @Override
- public String toString() {
- return "Response{"
- + "result=" + result
- + ", errCode=" + errCode
- + ", md5='" + md5 + '\''
- + ", data=" + data
- + '}';
- }
-}
diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/Topic.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/Topic.java
deleted file mode 100644
index 90cec8c..0000000
--- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/entity/Topic.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.sdk.sort.entity;
-
-import java.io.Serializable;
-import java.util.Map;
-import java.util.Objects;
-
-public class Topic implements Serializable {
-
- private String topic;
- private int partitionCnt;
- private Map<String, String> topicProperties;
-
- public String getTopic() {
- return topic;
- }
-
- public void setTopic(String topic) {
- this.topic = topic;
- }
-
- public int getPartitionCnt() {
- return partitionCnt;
- }
-
- public void setPartitionCnt(int partitionCnt) {
- this.partitionCnt = partitionCnt;
- }
-
- public Map<String, String> getTopicProperties() {
- return topicProperties;
- }
-
- public void setTopicProperties(Map<String, String> topicProperties) {
- this.topicProperties = topicProperties;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- Topic topic1 = (Topic) o;
- return topic.equals(topic1.topic);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(topic);
- }
-
- @Override
- public String toString() {
- return "Topic{"
- + "topic='" + topic + '\''
- + ", partitionCnt=" + partitionCnt
- + ", topicProperties=" + topicProperties
- + '}';
- }
-}
diff --git a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/QueryConsumeConfigImpl.java b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/QueryConsumeConfigImpl.java
index ee47b74..5265f80 100644
--- a/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/QueryConsumeConfigImpl.java
+++ b/inlong-sdk/sort-sdk/src/main/java/org/apache/inlong/sdk/sort/impl/QueryConsumeConfigImpl.java
@@ -31,15 +31,15 @@ import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
+import org.apache.inlong.common.pojo.sdk.CacheZone;
+import org.apache.inlong.common.pojo.sdk.CacheZoneConfig;
+import org.apache.inlong.common.pojo.sdk.SortSourceConfigResponse;
+import org.apache.inlong.common.pojo.sdk.Topic;
import org.apache.inlong.sdk.sort.api.ClientContext;
import org.apache.inlong.sdk.sort.api.QueryConsumeConfig;
-import org.apache.inlong.sdk.sort.entity.CacheZone;
import org.apache.inlong.sdk.sort.entity.CacheZoneCluster;
-import org.apache.inlong.sdk.sort.entity.CacheZoneConfig;
import org.apache.inlong.sdk.sort.entity.ConsumeConfig;
import org.apache.inlong.sdk.sort.entity.InLongTopic;
-import org.apache.inlong.sdk.sort.entity.ManagerResponse;
-import org.apache.inlong.sdk.sort.entity.Topic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,8 +67,8 @@ public class QueryConsumeConfigImpl implements QueryConsumeConfig {
}
// HTTP GET
- private ManagerResponse doGetRequest() throws Exception {
- ManagerResponse managerResponse;
+ private SortSourceConfigResponse doGetRequest() throws Exception {
+ SortSourceConfigResponse managerResponse;
HttpGet request = getHttpGet();
try (CloseableHttpResponse response = httpClient.execute(request)) {
@@ -82,7 +82,7 @@ public class QueryConsumeConfigImpl implements QueryConsumeConfig {
String result = EntityUtils.toString(entity);
logger.debug("response String result:{}", result);
try {
- managerResponse = new Gson().fromJson(result, ManagerResponse.class);
+ managerResponse = new Gson().fromJson(result, SortSourceConfigResponse.class);
return managerResponse;
} catch (Exception e) {
logger.error("parse json to ManagerResponse error:{}", e.getMessage(), e);
@@ -107,12 +107,12 @@ public class QueryConsumeConfigImpl implements QueryConsumeConfig {
public void reload() {
logger.debug("start to reload sort task config.");
try {
- ManagerResponse managerResponse = doGetRequest();
+ SortSourceConfigResponse managerResponse = doGetRequest();
if (managerResponse == null) {
logger.info("## reload managerResponse == null");
return;
}
- if (handleSortTaskConfResult(managerResponse, managerResponse.getErrCode())) {
+ if (handleSortTaskConfResult(managerResponse, managerResponse.getCode())) {
return;
}
} catch (Throwable e) {
@@ -136,7 +136,7 @@ public class QueryConsumeConfigImpl implements QueryConsumeConfig {
* @param respCodeValue int
* @return true/false
*/
- private boolean handleSortTaskConfResult(ManagerResponse response, int respCodeValue) throws Exception {
+ private boolean handleSortTaskConfResult(SortSourceConfigResponse response, int respCodeValue) throws Exception {
switch (respCodeValue) {
case NOUPDATE_VALUE:
logger.debug("manager conf noupdate");
@@ -162,7 +162,7 @@ public class QueryConsumeConfigImpl implements QueryConsumeConfig {
return false;
}
- private void updateSortTaskConf(ManagerResponse response) {
+ private void updateSortTaskConf(SortSourceConfigResponse response) {
CacheZoneConfig cacheZoneConfig = response.getData();
Map<String, List<InLongTopic>> newGroupTopicsMap = new HashMap<>();
for (Map.Entry<String, CacheZone> entry : cacheZoneConfig.getCacheZones().entrySet()) {