You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/07/23 04:08:26 UTC
[inlong] branch master updated: [INLONG-5160][DataPorxySDK] Update DataProxy cluster API and response parsing method (#5177)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new ed4d33e13 [INLONG-5160][DataPorxySDK] Update DataProxy cluster API and response parsing method (#5177)
ed4d33e13 is described below
commit ed4d33e136d0787af8c47eba54a0a2f0a2a4901a
Author: xueyingzhang <86...@users.noreply.github.com>
AuthorDate: Sat Jul 23 12:08:20 2022 +0800
[INLONG-5160][DataPorxySDK] Update DataProxy cluster API and response parsing method (#5177)
---
.../inlong/sdk/dataproxy/ConfigConstants.java | 2 +
.../inlong/sdk/dataproxy/DefaultMessageSender.java | 263 ++++++++++---------
.../inlong/sdk/dataproxy/ProxyClientConfig.java | 5 +-
.../sdk/dataproxy/config/ProxyClusterConfig.java | 39 +++
.../sdk/dataproxy/config/ProxyConfigEntry.java | 30 +--
.../sdk/dataproxy/config/ProxyConfigManager.java | 284 ++++++++-------------
.../inlong/sdk/dataproxy/network/Sender.java | 6 +-
.../sdk/dataproxy/ProxyConfigManagerTest.java | 55 ++++
.../src/test/resources/proxylist.json | 18 ++
9 files changed, 362 insertions(+), 340 deletions(-)
diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
index ead7cffbd..988b50429 100644
--- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
+++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ConfigConstants.java
@@ -67,4 +67,6 @@ public class ConfigConstants {
public static final int FLAG_ALLOW_ENCRYPT = 1 << 6;
public static final int FLAG_ALLOW_COMPRESS = 1 << 5;
+ public static final String MANAGER_DATAPROXY_API = "/api/inlong/manager/openapi/dataproxy/getIpList/";
+
}
diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
index b31e88e29..e5e3496cd 100644
--- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
+++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/DefaultMessageSender.java
@@ -18,13 +18,6 @@
package org.apache.inlong.sdk.dataproxy;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
@@ -38,12 +31,27 @@ import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
public class DefaultMessageSender implements MessageSender {
- private static final Logger logger = LoggerFactory.getLogger(DefaultMessageSender.class);
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(DefaultMessageSender.class);
private static final long DEFAULT_SEND_TIMEOUT = 100;
private static final TimeUnit DEFAULT_SEND_TIMEUNIT = TimeUnit.MILLISECONDS;
+ private static final ConcurrentHashMap<Integer, DefaultMessageSender> CACHE_SENDER =
+ new ConcurrentHashMap<>();
+ private static final AtomicBoolean MANAGER_FETCHER_THREAD_STARTED = new AtomicBoolean(false);
+ private static ManagerFetcherThread managerFetcherThread;
private final Sender sender;
private final SequentialID idGenerator;
+ private final IndexCollectThread indexCol;
+ /* Store index <groupId_streamId,cnt>*/
+ private final Map<String, Long> storeIndex = new ConcurrentHashMap<String, Long>();
private String groupId;
private int msgtype = ConfigConstants.MSG_TYPE;
private boolean isCompress = true;
@@ -52,16 +60,71 @@ public class DefaultMessageSender implements MessageSender {
private boolean isSupportLF = false;
private int cpsSize = ConfigConstants.COMPRESS_SIZE;
- private static final ConcurrentHashMap<String, DefaultMessageSender> cacheSender =
- new ConcurrentHashMap<>();
+ public DefaultMessageSender(ProxyClientConfig configure) throws Exception {
+ this(configure, null);
+ }
- private final IndexCollectThread indexCol;
- /* Store index <groupId_streamId,cnt>*/
- private final Map<String, Long> storeIndex = new ConcurrentHashMap<String, Long>();
+ public DefaultMessageSender(ProxyClientConfig configure, ThreadFactory selfDefineFactory) throws Exception {
+ ProxyUtils.validClientConfig(configure);
+ sender = new Sender(configure, selfDefineFactory);
+ idGenerator = new SequentialID(Utils.getLocalIp());
+ groupId = configure.getGroupId();
+ indexCol = new IndexCollectThread(storeIndex);
+ indexCol.start();
+
+ if (configure.isEnableSaveManagerVIps()
+ && configure.isLocalVisit()
+ && MANAGER_FETCHER_THREAD_STARTED.compareAndSet(false, true)) {
+ managerFetcherThread = new ManagerFetcherThread(configure);
+ managerFetcherThread.start();
+ }
+ }
+
+ /**
+ * generate by cluster id
+ *
+ * @param configure - sender
+ * @return - sender
+ */
+ public static DefaultMessageSender generateSenderByClusterId(
+ ProxyClientConfig configure) throws Exception {
- private static final AtomicBoolean ManagerFetcherThreadStarted = new AtomicBoolean(false);
+ return generateSenderByClusterId(configure, null);
+ }
- private static ManagerFetcherThread managerFetcherThread;
+ /**
+ * generate by cluster id
+ *
+ * @param configure - sender
+ * @param selfDefineFactory - sender factory
+ * @return - sender
+ */
+ public static DefaultMessageSender generateSenderByClusterId(ProxyClientConfig configure,
+ ThreadFactory selfDefineFactory) throws Exception {
+ ProxyConfigManager proxyConfigManager = new ProxyConfigManager(configure,
+ Utils.getLocalIp(), null);
+ proxyConfigManager.setGroupId(configure.getGroupId());
+ ProxyConfigEntry entry = proxyConfigManager.getGroupIdConfigure();
+ DefaultMessageSender sender = CACHE_SENDER.get(entry.getClusterId());
+ if (sender != null) {
+ return sender;
+ } else {
+ DefaultMessageSender tmpMessageSender =
+ new DefaultMessageSender(configure, selfDefineFactory);
+ CACHE_SENDER.put(entry.getClusterId(), tmpMessageSender);
+ return tmpMessageSender;
+ }
+ }
+
+ /**
+ * finally clean up
+ */
+ public static void finallyCleanup() {
+ for (DefaultMessageSender sender : CACHE_SENDER.values()) {
+ sender.close();
+ }
+ CACHE_SENDER.clear();
+ }
public boolean isSupportLF() {
return isSupportLF;
@@ -119,85 +182,19 @@ public class DefaultMessageSender implements MessageSender {
this.groupId = groupId;
}
- public DefaultMessageSender(ProxyClientConfig configure) throws Exception {
- this(configure, null);
- }
-
- public DefaultMessageSender(ProxyClientConfig configure, ThreadFactory selfDefineFactory) throws Exception {
- ProxyUtils.validClientConfig(configure);
- sender = new Sender(configure, selfDefineFactory);
- idGenerator = new SequentialID(Utils.getLocalIp());
- groupId = configure.getGroupId();
- indexCol = new IndexCollectThread(storeIndex);
- indexCol.start();
-
- if (configure.isEnableSaveManagerVIps()
- && configure.isLocalVisit()
- && ManagerFetcherThreadStarted.compareAndSet(false, true)) {
- managerFetcherThread = new ManagerFetcherThread(configure);
- managerFetcherThread.start();
- }
- }
-
- /**
- * generate by cluster id
- *
- * @param configure - sender
- * @return - sender
- */
- public static DefaultMessageSender generateSenderByClusterId(
- ProxyClientConfig configure) throws Exception {
-
- return generateSenderByClusterId(configure, null);
- }
-
- /**
- * generate by cluster id
- *
- * @param configure - sender
- * @param selfDefineFactory - sender factory
- * @return - sender
- */
- public static DefaultMessageSender generateSenderByClusterId(ProxyClientConfig configure,
- ThreadFactory selfDefineFactory) throws Exception {
- ProxyConfigManager proxyConfigManager = new ProxyConfigManager(configure,
- Utils.getLocalIp(), null);
- proxyConfigManager.setGroupId(configure.getGroupId());
- ProxyConfigEntry entry = proxyConfigManager.getGroupIdConfigure();
- DefaultMessageSender sender = cacheSender.get(entry.getClusterId());
- if (sender != null) {
- return sender;
- } else {
- DefaultMessageSender tmpMessageSender =
- new DefaultMessageSender(configure, selfDefineFactory);
- cacheSender.put(entry.getClusterId(), tmpMessageSender);
- return tmpMessageSender;
- }
- }
-
- /**
- * finally clean up
- */
- public static void finallyCleanup() {
- for (DefaultMessageSender sender : cacheSender.values()) {
- sender.close();
- }
- cacheSender.clear();
- }
-
public String getSDKVersion() {
return ConfigConstants.PROXY_SDK_VERSION;
}
@Deprecated
public SendResult sendMessage(byte[] body, String attributes, String msgUUID,
- long timeout, TimeUnit timeUnit) {
+ long timeout, TimeUnit timeUnit) {
return sender.syncSendMessage(new EncodeObject(body, attributes,
idGenerator.getNextId()), msgUUID, timeout, timeUnit);
}
public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID,
- long timeout, TimeUnit timeUnit) {
+ long timeout, TimeUnit timeUnit) {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
return SendResult.INVALID_ATTRIBUTES;
@@ -227,7 +224,7 @@ public class DefaultMessageSender implements MessageSender {
}
public SendResult sendMessage(byte[] body, String groupId, String streamId, long dt, String msgUUID,
- long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) {
+ long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt) || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
@@ -261,7 +258,7 @@ public class DefaultMessageSender implements MessageSender {
}
public SendResult sendMessage(List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID,
- long timeout, TimeUnit timeUnit) {
+ long timeout, TimeUnit timeUnit) {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)) {
return SendResult.INVALID_ATTRIBUTES;
@@ -289,10 +286,10 @@ public class DefaultMessageSender implements MessageSender {
}
public SendResult sendMessage(List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID,
- long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) {
+ long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)
- || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
+ || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
return SendResult.INVALID_ATTRIBUTES;
}
addIndexCnt(groupId, streamId, bodyList.size());
@@ -321,14 +318,14 @@ public class DefaultMessageSender implements MessageSender {
@Deprecated
public void asyncSendMessage(SendMessageCallback callback, byte[] body, String attributes, String msgUUID,
- long timeout, TimeUnit timeUnit) throws ProxysdkException {
+ long timeout, TimeUnit timeUnit) throws ProxysdkException {
sender.asyncSendMessage(new EncodeObject(body, attributes, idGenerator.getNextId()),
callback, msgUUID, timeout, timeUnit);
}
public void asyncSendMessage(SendMessageCallback callback, byte[] body,
- String groupId, String streamId, long dt, String msgUUID,
- long timeout, TimeUnit timeUnit) throws ProxysdkException {
+ String groupId, String streamId, long dt, String msgUUID,
+ long timeout, TimeUnit timeUnit) throws ProxysdkException {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
@@ -345,8 +342,8 @@ public class DefaultMessageSender implements MessageSender {
} else if (msgtype == 3 || msgtype == 5) {
if (isCompressEnd) {
sender.asyncSendMessage(new EncodeObject(body, "groupId="
- + groupId + "&streamId=" + streamId + "&dt=" + dt + "&cp=snappy",
- idGenerator.getNextId(), this.getMsgtype(), true, groupId),
+ + groupId + "&streamId=" + streamId + "&dt=" + dt + "&cp=snappy",
+ idGenerator.getNextId(), this.getMsgtype(), true, groupId),
callback, msgUUID, timeout, timeUnit);
} else {
sender.asyncSendMessage(
@@ -360,9 +357,9 @@ public class DefaultMessageSender implements MessageSender {
}
public void asyncSendMessage(SendMessageCallback callback,
- byte[] body, String groupId, String streamId, long dt, String msgUUID,
- long timeout, TimeUnit timeUnit,
- Map<String, String> extraAttrMap) throws ProxysdkException {
+ byte[] body, String groupId, String streamId, long dt, String msgUUID,
+ long timeout, TimeUnit timeUnit,
+ Map<String, String> extraAttrMap) throws ProxysdkException {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt) || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
@@ -393,8 +390,8 @@ public class DefaultMessageSender implements MessageSender {
}
public void asyncSendMessage(SendMessageCallback callback, List<byte[]> bodyList,
- String groupId, String streamId, long dt, String msgUUID,
- long timeout, TimeUnit timeUnit) throws ProxysdkException {
+ String groupId, String streamId, long dt, String msgUUID,
+ long timeout, TimeUnit timeUnit) throws ProxysdkException {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)) {
throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
@@ -424,12 +421,12 @@ public class DefaultMessageSender implements MessageSender {
}
public void asyncSendMessage(SendMessageCallback callback,
- List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID,
- long timeout, TimeUnit timeUnit,
- Map<String, String> extraAttrMap) throws ProxysdkException {
+ List<byte[]> bodyList, String groupId, String streamId, long dt, String msgUUID,
+ long timeout, TimeUnit timeUnit,
+ Map<String, String> extraAttrMap) throws ProxysdkException {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)
- || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
+ || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
}
addIndexCnt(groupId, streamId, bodyList.size());
@@ -459,11 +456,11 @@ public class DefaultMessageSender implements MessageSender {
/**
* asyncSendMessage
- *
- * @param inlongGroupId
- * @param inlongStreamId
- * @param body
- * @param callback
+ *
+ * @param inlongGroupId
+ * @param inlongStreamId
+ * @param body
+ * @param callback
* @throws ProxysdkException
*/
@Override
@@ -475,11 +472,11 @@ public class DefaultMessageSender implements MessageSender {
/**
* asyncSendMessage
- *
- * @param inlongGroupId
- * @param inlongStreamId
- * @param bodyList
- * @param callback
+ *
+ * @param inlongGroupId
+ * @param inlongStreamId
+ * @param bodyList
+ * @param callback
* @throws ProxysdkException
*/
@Override
@@ -499,17 +496,17 @@ public class DefaultMessageSender implements MessageSender {
storeIndex.put(key, cnt);
}
} catch (Exception e) {
- logger.error(e.getMessage());
+ LOGGER.error(e.getMessage());
}
}
public void asyncsendMessageData(FileCallback callback, List<byte[]> bodyList, String groupId,
- String streamId, long dt, int sid, boolean isSupportLF, String msgUUID,
- long timeout, TimeUnit timeUnit,
- Map<String, String> extraAttrMap) throws ProxysdkException {
+ String streamId, long dt, int sid, boolean isSupportLF, String msgUUID,
+ long timeout, TimeUnit timeUnit,
+ Map<String, String> extraAttrMap) throws ProxysdkException {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)
- || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
+ || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
}
addIndexCnt(groupId, streamId, bodyList.size());
@@ -526,8 +523,8 @@ public class DefaultMessageSender implements MessageSender {
}
private void asyncSendMetric(FileCallback callback, byte[] body, String groupId,
- String streamId, long dt, int sid, String ip, String msgUUID,
- long timeout, TimeUnit timeUnit, String messageKey) throws ProxysdkException {
+ String streamId, long dt, int sid, String ip, String msgUUID,
+ long timeout, TimeUnit timeUnit, String messageKey) throws ProxysdkException {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
@@ -541,23 +538,23 @@ public class DefaultMessageSender implements MessageSender {
}
public void asyncsendMessageProxy(FileCallback callback, byte[] body, String groupId, String streamId,
- long dt, int sid, String ip, String msgUUID,
- long timeout, TimeUnit timeUnit) throws ProxysdkException {
+ long dt, int sid, String ip, String msgUUID,
+ long timeout, TimeUnit timeUnit) throws ProxysdkException {
asyncSendMetric(callback, body, groupId, streamId, dt, sid, ip, msgUUID, timeout, timeUnit, "minute");
}
public void asyncsendMessageFile(FileCallback callback, byte[] body, String groupId,
- String streamId, long dt, int sid, String msgUUID,
- long timeout, TimeUnit timeUnit) throws ProxysdkException {
+ String streamId, long dt, int sid, String msgUUID,
+ long timeout, TimeUnit timeUnit) throws ProxysdkException {
asyncSendMetric(callback, body, groupId, streamId, dt, sid, "", msgUUID, timeout, timeUnit, "file");
}
public String sendMessageData(List<byte[]> bodyList, String groupId,
- String streamId, long dt, int sid, boolean isSupportLF, String msgUUID,
- long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) {
+ String streamId, long dt, int sid, boolean isSupportLF, String msgUUID,
+ long timeout, TimeUnit timeUnit, Map<String, String> extraAttrMap) {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(bodyList) || !ProxyUtils.isDtValid(dt)
- || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
+ || !ProxyUtils.isAttrKeysValid(extraAttrMap)) {
return SendResult.INVALID_ATTRIBUTES.toString();
}
addIndexCnt(groupId, streamId, bodyList.size());
@@ -575,7 +572,7 @@ public class DefaultMessageSender implements MessageSender {
}
private String sendMetric(byte[] body, String groupId, String streamId, long dt, int sid, String ip, String msgUUID,
- long timeout, TimeUnit timeUnit, String messageKey) {
+ long timeout, TimeUnit timeUnit, String messageKey) {
dt = ProxyUtils.covertZeroDt(dt);
if (!ProxyUtils.isBodyValid(body) || !ProxyUtils.isDtValid(dt)) {
return SendResult.INVALID_ATTRIBUTES.toString();
@@ -589,26 +586,26 @@ public class DefaultMessageSender implements MessageSender {
}
public String sendMessageProxy(byte[] body, String groupId, String streamId,
- long dt, int sid, String ip, String msgUUID,
- long timeout, TimeUnit timeUnit) {
+ long dt, int sid, String ip, String msgUUID,
+ long timeout, TimeUnit timeUnit) {
return sendMetric(body, groupId, streamId, dt, sid, ip, msgUUID, timeout, timeUnit, "minute");
}
public String sendMessageFile(byte[] body, String groupId, String streamId, long dt, int sid, String msgUUID,
- long timeout, TimeUnit timeUnit) {
+ long timeout, TimeUnit timeUnit) {
return sendMetric(body, groupId, streamId, dt, sid, "", msgUUID, timeout, timeUnit, "file");
}
private void shutdownInternalThreads() {
indexCol.shutDown();
managerFetcherThread.shutdown();
- ManagerFetcherThreadStarted.set(false);
+ MANAGER_FETCHER_THREAD_STARTED.set(false);
}
public void close() {
- logger.info("ready to close resources, may need five minutes !");
- if (sender.getClusterId() != null) {
- cacheSender.remove(sender.getClusterId());
+ LOGGER.info("ready to close resources, may need five minutes !");
+ if (sender.getClusterId() != -1) {
+ CACHE_SENDER.remove(sender.getClusterId());
}
sender.close();
shutdownInternalThreads();
diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
index e6516f35f..d68eba7f8 100644
--- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
+++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/ProxyClientConfig.java
@@ -100,8 +100,11 @@ public class ProxyClientConfig {
if (Utils.isBlank(managerIp)) {
throw new IllegalArgumentException("managerIp is Blank!");
}
+ if (Utils.isBlank(groupId)) {
+ throw new ProxysdkException("groupId is blank!");
+ }
this.proxyIPServiceURL =
- "http://" + managerIp + ":" + managerPort + "/api/inlong/manager/openapi/dataproxy/getIpList";
+ "http://" + managerIp + ":" + managerPort + ConfigConstants.MANAGER_DATAPROXY_API + groupId;
this.groupId = groupId;
this.netTag = netTag;
this.isLocalVisit = isLocalVisit;
diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyClusterConfig.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyClusterConfig.java
new file mode 100644
index 000000000..a9a347d2e
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyClusterConfig.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy.config;
+
+import org.apache.inlong.common.pojo.dataproxy.DataProxyNodeResponse;
+
+public class ProxyClusterConfig {
+ private boolean success;
+ private String errMsg;
+ private DataProxyNodeResponse data;
+
+ public boolean isSuccess() {
+ return success;
+ }
+
+ public String getErrMsg() {
+ return errMsg;
+ }
+
+ public DataProxyNodeResponse getData() {
+ return data;
+ }
+
+}
diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigEntry.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigEntry.java
index 5662e2b37..c975fafa4 100644
--- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigEntry.java
+++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigEntry.java
@@ -21,12 +21,10 @@ package org.apache.inlong.sdk.dataproxy.config;
import java.util.Map;
public class ProxyConfigEntry implements java.io.Serializable {
- private String clusterId;
+ private int clusterId;
private String groupId;
private int size;
private Map<String, HostInfo> hostMap;
- private int groupIdNum;
- private Map<String, Integer> streamIdNumMap;
private int load;
private int switchStat;
private boolean isInterVisit;
@@ -39,19 +37,6 @@ public class ProxyConfigEntry implements java.io.Serializable {
this.load = load;
}
- public int getGroupIdNum() {
- return groupIdNum;
- }
-
- public Map<String, Integer> getStreamIdNumMap() {
- return streamIdNumMap;
- }
-
- public void setGroupIdNumAndStreamIdNumMap(int groupIdNum, Map<String, Integer> streamIdNumMap) {
- this.groupIdNum = groupIdNum;
- this.streamIdNumMap = streamIdNumMap;
- }
-
public int getSwitchStat() {
return switchStat;
}
@@ -73,6 +58,10 @@ public class ProxyConfigEntry implements java.io.Serializable {
return size;
}
+ public void setSize(int size) {
+ this.size = size;
+ }
+
public String getGroupId() {
return groupId;
}
@@ -91,17 +80,16 @@ public class ProxyConfigEntry implements java.io.Serializable {
@Override
public String toString() {
- return "ProxyConfigEntry [hostMap=" + hostMap + ", load=" + load + ", bsn="
- + groupIdNum + ", tsnMap=" + streamIdNumMap
- + ", size=" + size + ", isInterVisit=" + isInterVisit + ", groupId=" + groupId
+ return "ProxyConfigEntry [hostMap=" + hostMap + ", load=" + load + ", size=" + size + ", isInterVisit="
+ + isInterVisit + ", groupId=" + groupId
+ ", switch=" + switchStat + "]";
}
- public String getClusterId() {
+ public int getClusterId() {
return clusterId;
}
- public void setClusterId(String clusterId) {
+ public void setClusterId(int clusterId) {
this.clusterId = clusterId;
}
}
diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
index 8dc833345..e96bd06c0 100644
--- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
+++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/config/ProxyConfigManager.java
@@ -24,7 +24,11 @@ import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import com.google.gson.stream.JsonReader;
import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.collections.MapUtils;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.ObjectUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.http.Header;
import org.apache.http.HttpResponse;
import org.apache.http.client.config.RequestConfig;
@@ -41,6 +45,8 @@ import org.apache.http.params.HttpConnectionParams;
import org.apache.http.params.HttpParams;
import org.apache.http.ssl.SSLContexts;
import org.apache.http.util.EntityUtils;
+import org.apache.inlong.common.pojo.dataproxy.DataProxyNodeInfo;
+import org.apache.inlong.common.pojo.dataproxy.DataProxyNodeResponse;
import org.apache.inlong.sdk.dataproxy.ConfigConstants;
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
import org.apache.inlong.sdk.dataproxy.network.ClientMgr;
@@ -82,7 +88,7 @@ import static org.apache.inlong.sdk.dataproxy.ConfigConstants.REQUEST_HEADER_AUT
public class ProxyConfigManager extends Thread {
public static final String APPLICATION_JSON = "application/json";
- private static final Logger logger = LoggerFactory.getLogger(ProxyConfigManager.class);
+ private static final Logger LOGGER = LoggerFactory.getLogger(ProxyConfigManager.class);
private final ProxyClientConfig clientConfig;
private final String localIP;
private final ClientMgr clientManager;
@@ -113,7 +119,7 @@ public class ProxyConfigManager extends Thread {
}
public void shutDown() {
- logger.info("Begin to shut down ProxyConfigManager!");
+ LOGGER.info("Begin to shut down ProxyConfigManager!");
bShutDown = true;
}
@@ -123,9 +129,9 @@ public class ProxyConfigManager extends Thread {
try {
doProxyEntryQueryWork();
updateEncryptConfigEntry();
- logger.info("ProxyConf update!");
+ LOGGER.info("ProxyConf update!");
} catch (Throwable e) {
- logger.error("Refresh proxy ip list runs into exception {}, {}", e.toString(), e.getStackTrace());
+ LOGGER.error("Refresh proxy ip list runs into exception {}, {}", e.toString(), e.getStackTrace());
e.printStackTrace();
}
@@ -138,13 +144,13 @@ public class ProxyConfigManager extends Thread {
if (proxyUpdateIntervalSec > 5) {
sleepTimeSec = proxyUpdateIntervalSec + random.nextInt() % (proxyUpdateIntervalSec / 5);
}
- logger.info("sleep time {}", sleepTimeSec);
+ LOGGER.info("sleep time {}", sleepTimeSec);
Thread.sleep(sleepTimeSec * 1000);
} catch (Throwable e2) {
//
}
}
- logger.info("ProxyConfigManager worker existed!");
+ LOGGER.info("ProxyConfigManager worker existed!");
}
/**
@@ -161,11 +167,11 @@ public class ProxyConfigManager extends Thread {
if (diffTime < clientConfig.getMaxProxyCacheTimeInMs()) {
JsonReader reader = new JsonReader(new FileReader(configCachePath));
ProxyConfigEntry proxyConfigEntry = gson.fromJson(reader, ProxyConfigEntry.class);
- logger.info("{} has a backup! {}", groupId, proxyConfigEntry);
+ LOGGER.info("{} has a backup! {}", groupId, proxyConfigEntry);
return proxyConfigEntry;
}
} catch (Exception ex) {
- logger.warn("try to read local cache, caught {}", ex.getMessage());
+ LOGGER.warn("try to read local cache, caught {}", ex.getMessage());
} finally {
rw.readLock().unlock();
}
@@ -180,13 +186,13 @@ public class ProxyConfigManager extends Thread {
// try to create parent
file.getParentFile().mkdirs();
}
- logger.info("try to write {}} to local cache {}", entry, configCachePath);
+ LOGGER.info("try to write {}} to local cache {}", entry, configCachePath);
FileWriter fileWriter = new FileWriter(configCachePath);
gson.toJson(entry, fileWriter);
fileWriter.flush();
fileWriter.close();
} catch (Exception ex) {
- logger.warn("try to write local cache, caught {}", ex.getMessage());
+ LOGGER.warn("try to write local cache, caught {}", ex.getMessage());
} finally {
rw.writeLock().unlock();
}
@@ -196,7 +202,7 @@ public class ProxyConfigManager extends Thread {
try {
return requestProxyList(this.clientConfig.getProxyIPServiceURL());
} catch (Exception e) {
- logger.warn("try to request proxy list by http, caught {}", e.getMessage());
+ LOGGER.warn("try to request proxy list by http, caught {}", e.getMessage());
}
return null;
}
@@ -271,7 +277,7 @@ public class ProxyConfigManager extends Thread {
}
/* We should exit if no local IP list and can't request it from manager.*/
if (localMd5 == null && proxyEntry == null) {
- logger.error("Can't connect manager at the start of proxy API {}",
+ LOGGER.error("Can't connect manager at the start of proxy API {}",
this.clientConfig.getProxyIPServiceURL());
proxyEntry = tryToReadCacheProxyEntry(configAddr);
}
@@ -281,7 +287,7 @@ public class ProxyConfigManager extends Thread {
s.append(tmp.getHostName()).append(";").append(tmp.getPortNumber())
.append(",");
}
- logger.warn("Backup proxyEntry [{}]", s);
+ LOGGER.warn("Backup proxyEntry [{}]", s);
}
}
if (localMd5 == null && proxyEntry == null && proxyInfoList == null) {
@@ -302,12 +308,10 @@ public class ProxyConfigManager extends Thread {
*/
private void compareProxyList(ProxyConfigEntry proxyEntry) {
if (proxyEntry != null) {
- logger.info("{}", proxyEntry.toString());
+ LOGGER.info("{}", proxyEntry.toString());
if (proxyEntry.getSize() != 0) {
/* Initialize the current proxy information list first. */
clientManager.setLoadThreshold(proxyEntry.getLoad());
- clientManager.setGroupIdNum(proxyEntry.getGroupIdNum());
- clientManager.setStreamIdMap(proxyEntry.getStreamIdNumMap());
List<HostInfo> newProxyInfoList = new ArrayList<HostInfo>();
for (Map.Entry<String, HostInfo> entry : proxyEntry.getHostMap().entrySet()) {
@@ -318,7 +322,7 @@ public class ProxyConfigManager extends Thread {
String oldMd5 = calcHostInfoMd5(proxyInfoList);
if (newMd5 != null && !newMd5.equals(oldMd5)) {
/* Choose random alive connections to send messages. */
- logger.info("old md5 {} new md5 {}", oldMd5, newMd5);
+ LOGGER.info("old md5 {} new md5 {}", oldMd5, newMd5);
proxyInfoList.clear();
proxyInfoList = newProxyInfoList;
clientManager.setProxyInfoList(proxyInfoList);
@@ -327,19 +331,19 @@ public class ProxyConfigManager extends Thread {
/*judge cluster's switch state*/
oldStat = proxyEntry.getSwitchStat();
if ((System.currentTimeMillis() - doworkTime) > 3 * 60 * 1000) {
- logger.info("switch the cluster!");
+ LOGGER.info("switch the cluster!");
proxyInfoList.clear();
proxyInfoList = newProxyInfoList;
clientManager.setProxyInfoList(proxyInfoList);
} else {
- logger.info("only change oldStat ");
+ LOGGER.info("only change oldStat ");
}
} else {
newProxyInfoList.clear();
- logger.info("proxy IP list doesn't change, load {}", proxyEntry.getLoad());
+ LOGGER.info("proxy IP list doesn't change, load {}", proxyEntry.getLoad());
}
} else {
- logger.error("proxyEntry's size is zero");
+ LOGGER.error("proxyEntry's size is zero");
}
}
}
@@ -410,7 +414,7 @@ public class ProxyConfigManager extends Thread {
private EncryptConfigEntry getStoredPubKeyEntry(String userName) {
if (Utils.isBlank(userName)) {
- logger.warn(" userName(" + userName + ") is not available");
+ LOGGER.warn(" userName(" + userName + ") is not available");
return null;
}
EncryptConfigEntry entry;
@@ -430,7 +434,7 @@ public class ProxyConfigManager extends Thread {
return null;
}
} catch (Throwable e1) {
- logger.error("Read " + userName + " stored PubKeyEntry error ", e1);
+ LOGGER.error("Read " + userName + " stored PubKeyEntry error ", e1);
return null;
} finally {
if (fis != null) {
@@ -462,7 +466,7 @@ public class ProxyConfigManager extends Thread {
p.flush();
//p.close();
} catch (Throwable e) {
- logger.error("store EncryptConfigEntry " + entry.toString() + " exception ", e);
+ LOGGER.error("store EncryptConfigEntry " + entry.toString() + " exception ", e);
e.printStackTrace();
} finally {
if (fos != null) {
@@ -497,29 +501,34 @@ public class ProxyConfigManager extends Thread {
private EncryptConfigEntry requestPubKey(String pubKeyUrl, String userName, boolean needGet) {
if (Utils.isBlank(userName)) {
- logger.error("Queried userName is null!");
+ LOGGER.error("Queried userName is null!");
return null;
}
List<BasicNameValuePair> params = new ArrayList<BasicNameValuePair>();
params.add(new BasicNameValuePair("operation", "query"));
params.add(new BasicNameValuePair("username", userName));
- JsonObject pubKeyConf = requestConfiguration(pubKeyUrl, params);
+ String returnStr = requestConfiguration(pubKeyUrl, params);
+ if (Utils.isBlank(returnStr)) {
+ LOGGER.info("No public key information returned from manager");
+ return null;
+ }
+ JsonObject pubKeyConf = jsonParser.parse(returnStr).getAsJsonObject();
if (pubKeyConf == null) {
- logger.info("No public key information returned from manager");
+ LOGGER.info("No public key information returned from manager");
return null;
}
if (!pubKeyConf.has("resultCode")) {
- logger.info("Parse pubKeyConf failure: No resultCode key information returned from manager");
+ LOGGER.info("Parse pubKeyConf failure: No resultCode key information returned from manager");
return null;
}
int resultCode = pubKeyConf.get("resultCode").getAsInt();
if (resultCode != 0) {
- logger.info("query pubKeyConf failure, error code is " + resultCode + ", errInfo is "
+ LOGGER.info("query pubKeyConf failure, error code is " + resultCode + ", errInfo is "
+ pubKeyConf.get("message").getAsString());
return null;
}
if (!pubKeyConf.has("resultData")) {
- logger.info("Parse pubKeyConf failure: No resultData key information returned from manager");
+ LOGGER.info("Parse pubKeyConf failure: No resultData key information returned from manager");
return null;
}
JsonObject resultData = pubKeyConf.get("resultData").getAsJsonObject();
@@ -541,78 +550,20 @@ public class ProxyConfigManager extends Thread {
return null;
}
- private ProxyConfigEntry getLocalProxyListFromFile(String filePath) throws Exception {
- JsonObject localProxyAddrJson;
+ public ProxyConfigEntry getLocalProxyListFromFile(String filePath) throws Exception {
+ DataProxyNodeResponse proxyCluster;
try {
byte[] fileBytes = Files.readAllBytes(Paths.get(filePath));
- localProxyAddrJson = jsonParser.parse(new String(fileBytes)).getAsJsonObject();
+ proxyCluster = gson.fromJson(new String(fileBytes), DataProxyNodeResponse.class);
} catch (Throwable e) {
throw new Exception("Read local proxyList File failure by " + filePath + ", reason is " + e.getCause());
}
-
- int groupIdNum = 0;
- if (localProxyAddrJson.has("bsn")) {
- groupIdNum = localProxyAddrJson.get("bsn").getAsInt();
+ if (ObjectUtils.isEmpty(proxyCluster)) {
+ LOGGER.warn("no proxyCluster configure from local file");
+ return null;
}
- int load = ConfigConstants.LOAD_THRESHOLD;
- if (localProxyAddrJson.has("load")) {
- int inLoad = localProxyAddrJson.get("load").getAsInt();
- load = inLoad > 200 ? 200 : (Math.max(inLoad, 0));
- }
- ProxyConfigEntry proxyEntry = new ProxyConfigEntry();
- proxyEntry.setGroupId(clientConfig.getGroupId());
- boolean isInterVisit = checkValidProxy(filePath, localProxyAddrJson);
- proxyEntry.setInterVisit(isInterVisit);
- Map<String, HostInfo> hostMap = getHostInfoMap(
- localProxyAddrJson);
- proxyEntry.setHostMap(hostMap);
- proxyEntry.setSwitchStat(0);
- Map<String, Integer> streamIdMap = getStreamIdMap(localProxyAddrJson);
- proxyEntry.setGroupIdNumAndStreamIdNumMap(groupIdNum, streamIdMap);
- proxyEntry.setLoad(load);
- if (localProxyAddrJson.has("clusterId")) {
- proxyEntry.setClusterId(localProxyAddrJson.get("clusterId").getAsString());
- }
- return proxyEntry;
- }
-
- private Map<String, HostInfo> getHostInfoMap(JsonObject localProxyAddrJson)
- throws Exception {
- Map<String, HostInfo> hostMap = new HashMap<String, HostInfo>();
- JsonArray jsonHostList = localProxyAddrJson.get("address").getAsJsonArray();
- if (jsonHostList == null) {
- throw new Exception("Parse local proxyList failure: address field is not exist!");
- }
- for (int i = 0; i < jsonHostList.size(); i++) {
- JsonObject jsonItem = jsonHostList.get(i).getAsJsonObject();
- if (jsonItem != null) {
- if (!jsonItem.has("port")) {
- throw new Exception("Parse local proxyList failure: "
- + "port field is not exist in address(" + i + ")!");
- }
- int port = jsonItem.get("port").getAsInt();
- if (port <= 0) {
- throw new Exception("Parse local proxyList failure: "
- + "port value <= 0 in address(" + i + ")!");
- }
- if (!jsonItem.has("host")) {
- throw new Exception("Parse local proxyList failure: "
- + "host field is not exist in address(" + i + ")!");
- }
- String hostItem = jsonItem.get("host").getAsString();
- if (Utils.isBlank(hostItem)) {
- throw new Exception("Parse local proxyList failure: "
- + "host value is blank in address(" + i + ")!");
- }
- String refId = hostItem + ":" + String.valueOf(port);
- hostMap.put(refId, new HostInfo(refId, hostItem, port));
- }
- }
- if (hostMap.isEmpty()) {
- throw new Exception("Parse local proxyList failure: address is empty !");
- }
- return hostMap;
+ return getProxyConfigEntry(proxyCluster);
}
private Map<String, Integer> getStreamIdMap(JsonObject localProxyAddrJson) {
@@ -629,106 +580,76 @@ public class ProxyConfigManager extends Thread {
return streamIdMap;
}
- private boolean checkValidProxy(String filePath, JsonObject localProxyAddrJson) throws Exception {
- if (localProxyAddrJson == null) {
- throw new Exception("Read local proxyList File failure by " + filePath + ", reason is content is null!");
- }
- if (!localProxyAddrJson.has("size")) {
- throw new Exception("Parse local proxyList failure: size field is not exist!");
- }
- int size = localProxyAddrJson.get("size").getAsInt();
- if (size == 0) {
- throw new Exception("Parse local proxyList failure: proxy list size = 0!");
- }
- boolean isInterVisit = false;
- if (localProxyAddrJson.has("isInterVisit")) {
- isInterVisit = localProxyAddrJson.get("isInterVisit").getAsInt() != 0;
- }
- return isInterVisit;
- }
-
public ProxyConfigEntry requestProxyList(String url) {
ArrayList<BasicNameValuePair> params = new ArrayList<BasicNameValuePair>();
params.add(new BasicNameValuePair("extTag", clientConfig.getNetTag()));
params.add(new BasicNameValuePair("ip", this.localIP));
- logger.info("Begin to get configure from manager {}, param is {}", url, params);
+ LOGGER.info("Begin to get configure from manager {}, param is {}", url, params);
- JsonObject jsonRes = requestConfiguration(url, params);
- if (jsonRes == null) {
+ String resultStr = requestConfiguration(url, params);
+ ProxyClusterConfig clusterConfig = gson.fromJson(resultStr, ProxyClusterConfig.class);
+ if (clusterConfig == null || !clusterConfig.isSuccess() || clusterConfig.getData() == null) {
return null;
}
- Map<String, HostInfo> hostMap = formatHostInfoMap(
- jsonRes);
+ DataProxyNodeResponse proxyCluster = clusterConfig.getData();
+ return getProxyConfigEntry(proxyCluster);
+ }
- if (hostMap == null) {
+ private ProxyConfigEntry getProxyConfigEntry(DataProxyNodeResponse proxyCluster) {
+ List<DataProxyNodeInfo> nodeList = proxyCluster.getNodeList();
+ if (CollectionUtils.isEmpty(nodeList)) {
+ LOGGER.error("dataproxy nodeList is empty in DataProxyNodeResponse!");
return null;
}
- int groupIdNum = 0;
- if (jsonRes.has("bsn")) {
- groupIdNum = jsonRes.get("bsn").getAsInt();
+ Map<String, HostInfo> hostMap = formatHostInfoMap(nodeList);
+ if (MapUtils.isEmpty(hostMap)) {
+ return null;
+ }
+
+ int clusterId = -1;
+ if (ObjectUtils.isNotEmpty(proxyCluster.getClusterId())) {
+ clusterId = proxyCluster.getClusterId();
}
int load = ConfigConstants.LOAD_THRESHOLD;
- if (jsonRes.has("load")) {
- int inLoad = jsonRes.get("load").getAsInt();
- load = inLoad > 200 ? 200 : (Math.max(inLoad, 0));
+ if (ObjectUtils.isNotEmpty(proxyCluster.getLoad())) {
+ load = proxyCluster.getLoad() > 200 ? 200 : (Math.max(proxyCluster.getLoad(), 0));
+ }
+ boolean isIntranet = true;
+ if (ObjectUtils.isNotEmpty(proxyCluster.getIsSwitch())) {
+ isIntranet = proxyCluster.getIsIntranet() == 1 ? true : false;
+ }
+ int isSwitch = 0;
+ if (ObjectUtils.isNotEmpty(proxyCluster.getIsSwitch())) {
+ isSwitch = proxyCluster.getIsSwitch();
}
+
ProxyConfigEntry proxyEntry = new ProxyConfigEntry();
+ proxyEntry.setClusterId(clusterId);
proxyEntry.setGroupId(clientConfig.getGroupId());
- proxyEntry.setInterVisit(true);
+ proxyEntry.setInterVisit(isIntranet);
proxyEntry.setHostMap(hostMap);
- proxyEntry.setSwitchStat(0);
- Map<String, Integer> streamIdMap = getStreamIdMap(jsonRes);
- proxyEntry.setGroupIdNumAndStreamIdNumMap(groupIdNum, streamIdMap);
+ proxyEntry.setSwitchStat(isSwitch);
proxyEntry.setLoad(load);
- if (jsonRes.has("data")) {
- JsonArray data = jsonRes.getAsJsonArray("data");
- if (data != null) {
- String id = data.get(0).getAsJsonObject().get("id").getAsString();
- proxyEntry.setClusterId(id);
- }
- }
+ proxyEntry.setSize(nodeList.size());
return proxyEntry;
}
- private Map<String, HostInfo> formatHostInfoMap(JsonObject jsonRes) {
- Map<String, HostInfo> hostMap = new HashMap<String, HostInfo>();
- JsonArray jsonHostList = jsonRes.getAsJsonArray("data");
- if (jsonHostList == null) {
- logger.info("Parse proxyList failure: address field is not exist for response from manager!");
- return null;
- }
- for (int i = 0; i < jsonHostList.size(); i++) {
- JsonObject jsonItem = jsonHostList.get(i).getAsJsonObject();
- if (jsonItem != null) {
- if (!jsonItem.has("port")) {
- logger.error("Parse proxyList failure: port field is not exist in address("
- + i + ") for response from manager!");
- return null;
- }
- int port = jsonItem.get("port").getAsInt();
- if (port <= 0) {
- logger.info("Parse proxyList failure: port value <= 0 in address("
- + i + ") for response from manager!");
- return null;
- }
- if (!jsonItem.has("ip")) {
- logger.error("Parse proxyList failure: host field is not exist in address("
- + i + ") for response from manager!");
- return null;
- }
- String hostItem = jsonItem.get("ip").getAsString();
- if (Utils.isBlank(hostItem)) {
- logger.error("Parse proxyList failure: host value is blank in address("
- + i + ") for response from manager!");
- return null;
- }
- String refId = hostItem + ":" + String.valueOf(port);
- hostMap.put(refId, new HostInfo(refId, hostItem, port));
+ private Map<String, HostInfo> formatHostInfoMap(List<DataProxyNodeInfo> nodeList) {
+ Map<String, HostInfo> hostMap = new HashMap<>();
+ for (DataProxyNodeInfo proxy : nodeList) {
+ if (ObjectUtils.isEmpty(proxy.getId()) || StringUtils.isEmpty(proxy.getIp()) || ObjectUtils
+ .isEmpty(proxy.getPort()) || proxy.getPort() < 0) {
+ LOGGER.error("invalid proxy node, id:{}, ip:{}, port:{}", proxy.getId(), proxy.getIp(),
+ proxy.getPort());
+ continue;
}
+ String refId = proxy.getIp() + ":" + proxy.getPort();
+ hostMap.put(refId, new HostInfo(refId, proxy.getIp(), proxy.getPort()));
+
}
if (hostMap.isEmpty()) {
- logger.error("Parse proxyList failure: address is empty for response from manager!");
+ LOGGER.error("Parse proxyList failure: address is empty for response from manager!");
return null;
}
return hostMap;
@@ -768,9 +689,9 @@ public class ProxyConfigManager extends Thread {
}
/* Request new configurations from Manager. */
- private JsonObject requestConfiguration(String url, List<BasicNameValuePair> params) {
+ private String requestConfiguration(String url, List<BasicNameValuePair> params) {
if (Utils.isBlank(url)) {
- logger.error("request url is null");
+ LOGGER.error("request url is null");
return null;
}
// get local managerIpList
@@ -789,7 +710,7 @@ public class ProxyConfigManager extends Thread {
try {
httpClient = getCloseableHttpClient(params);
} catch (Throwable eHttps) {
- logger.error("Create Https cliet failure, error 1 is ", eHttps);
+ LOGGER.error("Create Https cliet failure, error 1 is ", eHttps);
eHttps.printStackTrace();
return null;
}
@@ -805,7 +726,7 @@ public class ProxyConfigManager extends Thread {
}
tryIdx++;
- logger.info("Request url : " + url + ", localManagerIps : " + localManagerIps);
+ LOGGER.info("Request url : " + url + ", localManagerIps : " + localManagerIps);
try {
httpPost = new HttpPost(url);
if (this.clientConfig.isNeedAuthentication()) {
@@ -821,16 +742,15 @@ public class ProxyConfigManager extends Thread {
HttpResponse response = httpClient.execute(httpPost);
returnStr = EntityUtils.toString(response.getEntity());
if (Utils.isNotBlank(returnStr) && response.getStatusLine().getStatusCode() == 200) {
- logger.info("Get configure from manager is " + returnStr);
- JsonObject jsonRes = jsonParser.parse(returnStr).getAsJsonObject();
- return jsonRes;
+ LOGGER.info("Get configure from manager is " + returnStr);
+ return returnStr;
}
if (!clientConfig.isLocalVisit()) {
return null;
}
} catch (Throwable e) {
- logger.error("Connect Manager error, message: {}, url is {}", e.getMessage(), url);
+ LOGGER.error("Connect Manager error, message: {}, url is {}", e.getMessage(), url);
if (!clientConfig.isLocalVisit()) {
return null;
@@ -887,7 +807,7 @@ public class ProxyConfigManager extends Thread {
byte[] serialized;
serialized = FileUtils.readFileToByteArray(localManagerIpsFile);
if (serialized == null) {
- logger.error("Local managerIp file is empty, file path : "
+ LOGGER.error("Local managerIp file is empty, file path : "
+ clientConfig.getManagerIpLocalPath());
return null;
}
@@ -897,12 +817,12 @@ public class ProxyConfigManager extends Thread {
localManagerIpsFile.getParentFile().mkdirs();
}
localManagerIps = "";
- logger.error("Get local managerIpList not exist, file path : "
+ LOGGER.error("Get local managerIpList not exist, file path : "
+ clientConfig.getManagerIpLocalPath());
}
} catch (Throwable t) {
localManagerIps = "";
- logger.error("Get local managerIpList occur exception,", t);
+ LOGGER.error("Get local managerIpList occur exception,", t);
}
return localManagerIps;
}
diff --git a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
index de6920993..5f2c72ebc 100644
--- a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
+++ b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/network/Sender.java
@@ -65,7 +65,7 @@ public class Sender {
private final ProxyClientConfig configure;
private final boolean isFile;
private final MetricWorkerThread metricWorker;
- private String clusterId;
+ private int clusterId = -1;
public Sender(ProxyClientConfig configure) throws Exception {
this(configure, null);
@@ -696,11 +696,11 @@ public class Sender {
callbacks.clear();
}
- public String getClusterId() {
+ public int getClusterId() {
return clusterId;
}
- public void setClusterId(String clusterId) {
+ public void setClusterId(int clusterId) {
this.clusterId = clusterId;
}
diff --git a/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyConfigManagerTest.java b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyConfigManagerTest.java
new file mode 100644
index 000000000..d6817eb7a
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk/src/test/java/org/apache/inlong/sdk/dataproxy/ProxyConfigManagerTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.sdk.dataproxy;
+
+import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
+import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
+import org.apache.inlong.sdk.dataproxy.network.ClientMgr;
+import org.junit.Assert;
+import org.junit.Test;
+import org.powermock.api.mockito.PowerMockito;
+
+import java.net.URISyntaxException;
+import java.nio.file.Paths;
+import java.util.Objects;
+
+public class ProxyConfigManagerTest {
+
+ private final String localFile = Paths.get(
+ Objects.requireNonNull(this.getClass().getClassLoader().getResource("proxylist.json")).toURI())
+ .toString();
+ private final ProxyClientConfig clientConfig = PowerMockito.mock(ProxyClientConfig.class);
+ private final ClientMgr clientMgr = PowerMockito.mock(ClientMgr.class);
+ private final ProxyConfigManager proxyConfigManager = new ProxyConfigManager(clientConfig, "127.0.0.1",
+ clientMgr);
+
+ public ProxyConfigManagerTest() throws URISyntaxException {
+ }
+
+ @Test
+ public void testProxyConfigParse() throws Exception {
+ ProxyConfigEntry proxyEntry = proxyConfigManager.getLocalProxyListFromFile(localFile);
+ Assert.assertEquals(proxyEntry.isInterVisit(), false);
+ Assert.assertEquals(proxyEntry.getLoad(), 12);
+ Assert.assertEquals(proxyEntry.getClusterId(), 1);
+ Assert.assertEquals(proxyEntry.getSize(), 2);
+ Assert.assertEquals(proxyEntry.getHostMap().containsKey("127.0.0.1:46801"), true);
+ Assert.assertEquals(proxyEntry.getHostMap().containsKey("127.0.0.1:8080"), false);
+ }
+
+}
diff --git a/inlong-sdk/dataproxy-sdk/src/test/resources/proxylist.json b/inlong-sdk/dataproxy-sdk/src/test/resources/proxylist.json
new file mode 100644
index 000000000..ffcc6efa6
--- /dev/null
+++ b/inlong-sdk/dataproxy-sdk/src/test/resources/proxylist.json
@@ -0,0 +1,18 @@
+{
+ "clusterId": 1,
+ "load": 12,
+ "isIntranet": 0,
+ "isSwitch": 1,
+ "nodeList": [
+ {
+ "id": 1,
+ "ip": "127.0.0.1",
+ "port": 46801
+ },
+ {
+ "id": 2,
+ "ip": "127.0.0.2",
+ "port": 46801
+ }
+ ]
+}
\ No newline at end of file