You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/04/29 02:40:12 UTC
[incubator-doris] branch master updated: [refactor][routineload] Remove unused client object from routine load (#9223)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 93a41b2625 [refactor][routineload] Remove unused client object from routine load (#9223)
93a41b2625 is described below
commit 93a41b2625af74255487b3ac439ec293b9682ee0
Author: xy720 <22...@users.noreply.github.com>
AuthorDate: Fri Apr 29 10:40:07 2022 +0800
[refactor][routineload] Remove unused client object from routine load (#9223)
---
.../org/apache/doris/common/util/KafkaUtil.java | 26 ----------------------
1 file changed, 26 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java b/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java
index ecbb897ab5..3739ab22cf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/util/KafkaUtil.java
@@ -18,14 +18,12 @@
package org.apache.doris.common.util;
import org.apache.doris.catalog.Catalog;
-import org.apache.doris.common.ClientPool;
import org.apache.doris.common.LoadException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.proto.InternalService;
import org.apache.doris.rpc.BackendServiceProxy;
import org.apache.doris.system.Backend;
-import org.apache.doris.thrift.BackendService;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TStatusCode;
@@ -47,9 +45,7 @@ public class KafkaUtil {
public static List<Integer> getAllKafkaPartitions(String brokerList, String topic,
Map<String, String> convertedCustomProperties) throws UserException {
- BackendService.Client client = null;
TNetworkAddress address = null;
- boolean ok = false;
try {
List<Long> backendIds = Catalog.getCurrentSystemInfo().getBackendIds(true);
if (backendIds.isEmpty()) {
@@ -89,12 +85,6 @@ public class KafkaUtil {
LOG.warn("failed to get partitions.", e);
throw new LoadException(
"Failed to get all partitions of kafka topic: " + topic + ". error: " + e.getMessage());
- } finally {
- if (ok) {
- ClientPool.backendPool.returnObject(address, client);
- } else {
- ClientPool.backendPool.invalidateObject(address, client);
- }
}
}
@@ -104,10 +94,8 @@ public class KafkaUtil {
public static List<Pair<Integer, Long>> getOffsetsForTimes(String brokerList, String topic,
Map<String, String> convertedCustomProperties,
List<Pair<Integer, Long>> timestampOffsets) throws LoadException {
- BackendService.Client client = null;
TNetworkAddress address = null;
LOG.debug("begin to get offsets for times of topic: {}, {}", topic, timestampOffsets);
- boolean ok = false;
try {
List<Long> backendIds = Catalog.getCurrentSystemInfo().getBackendIds(true);
if (backendIds.isEmpty()) {
@@ -159,23 +147,15 @@ public class KafkaUtil {
LOG.warn("failed to get offsets for times.", e);
throw new LoadException(
"Failed to get offsets for times of kafka topic: " + topic + ". error: " + e.getMessage());
- } finally {
- if (ok) {
- ClientPool.backendPool.returnObject(address, client);
- } else {
- ClientPool.backendPool.invalidateObject(address, client);
- }
}
}
public static List<Pair<Integer, Long>> getLatestOffsets(long jobId, UUID taskId, String brokerList, String topic,
Map<String, String> convertedCustomProperties,
List<Integer> partitionIds) throws LoadException {
- BackendService.Client client = null;
TNetworkAddress address = null;
LOG.debug("begin to get latest offsets for partitions {} in topic: {}, task {}, job {}",
partitionIds, topic, taskId, jobId);
- boolean ok = false;
try {
List<Long> backendIds = Catalog.getCurrentSystemInfo().getBackendIds(true);
if (backendIds.isEmpty()) {
@@ -226,12 +206,6 @@ public class KafkaUtil {
LOG.warn("failed to get latest offsets.", e);
throw new LoadException(
"Failed to get latest offsets of kafka topic: " + topic + ". error: " + e.getMessage());
- } finally {
- if (ok) {
- ClientPool.backendPool.returnObject(address, client);
- } else {
- ClientPool.backendPool.invalidateObject(address, client);
- }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org